prosodic.utils

  1from typing import Any, Callable, Dict, List, Optional, Tuple, Union
  2from .imports import *
  3
  4class SimpleCache:
  5    """A simple file-based caching system.
  6
  7    This class provides a dictionary-like interface for caching objects to disk.
  8    It uses a two-level directory structure to organize cached files.
  9
 10    Attributes:
 11        root_dir (str): The root directory for storing cached files.
 12    """
 13
 14    def __init__(self, root_dir: str = PATH_HOME_DATA_CACHE) -> None:
 15        """Initialize the SimpleCache.
 16
 17        Args:
 18            root_dir: The root directory for storing cached files.
 19        """
 20        self.root_dir = root_dir
 21        os.makedirs(root_dir, exist_ok=True)
 22
 23    def _get_file_path(self, key: str) -> str:
 24        """Get the file path for a given key.
 25
 26        Args:
 27            key: The cache key.
 28
 29        Returns:
 30            The file path for the given key.
 31        """
 32        # Use the first 2 characters for the first level directory
 33        # and the next 2 characters for the second level directory
 34        dir1 = key[:2]
 35        dir2 = key[2:4]
 36        file_name = key[4:]
 37        
 38        dir_path = os.path.join(self.root_dir, dir1, dir2)
 39        os.makedirs(dir_path, exist_ok=True)
 40        
 41        return os.path.join(dir_path, file_name)
 42
 43    def __setitem__(self, key: str, value: Any) -> None:
 44        """Set an item in the cache.
 45
 46        Args:
 47            key: The cache key.
 48            value: The value to cache.
 49        """
 50        file_path = self._get_file_path(key)
 51        with open(file_path, 'wb') as f:
 52            f.write(encode_cache(value))
 53
 54    def __getitem__(self, key: str) -> Any:
 55        """Get an item from the cache.
 56
 57        Args:
 58            key: The cache key.
 59
 60        Returns:
 61            The cached value.
 62
 63        Raises:
 64            KeyError: If the key is not found in the cache.
 65        """
 66        file_path = self._get_file_path(key)
 67        if not os.path.exists(file_path):
 68            raise KeyError(key)
 69        with open(file_path, 'rb') as f:
 70            return decode_cache(f.read())
 71
 72    def __contains__(self, key: str) -> bool:
 73        """Check if a key exists in the cache.
 74
 75        Args:
 76            key: The cache key.
 77
 78        Returns:
 79            True if the key exists, False otherwise.
 80        """
 81        return os.path.exists(self._get_file_path(key))
 82
 83    def get(self, key: str, default: Any = None) -> Any:
 84        """Get an item from the cache with a default value.
 85
 86        Args:
 87            key: The cache key.
 88            default: The default value to return if the key is not found.
 89
 90        Returns:
 91            The cached value or the default value.
 92        """
 93        try:
 94            return self[key]
 95        except KeyError:
 96            return default
 97
 98def retry_on_io_error(max_attempts: int = 3, delay: float = 0.1) -> Callable:
 99    """Decorator to retry a function on IOError.
100
101    Args:
102        max_attempts: Maximum number of retry attempts.
103        delay: Delay between retry attempts in seconds.
104
105    Returns:
106        A decorator function.
107    """
108    def decorator(func):
109        @wraps(func)
110        def wrapper(*args, **kwargs):
111            for attempt in range(max_attempts):
112                try:
113                    return func(*args, **kwargs)
114                except IOError as e:
115                    if attempt < max_attempts - 1:
116                        time.sleep(delay)
117                    else:
118                        raise
119        return wrapper
120    return decorator
121
122
123def group_ents(l: List[Any], feat: str) -> List[List[Any]]:
124    """Group entities based on a common feature.
125
126    Args:
127        l: List of entities to group.
128        feat: The feature to group by.
129
130    Returns:
131        A list of grouped entities.
132    """
133    val = None
134    ol = []
135    lx = []
136    for x in l:
137        valx = getattr(x, feat)
138        if valx is not val and lx:
139            ol.append(lx)
140            lx = []
141        lx.append(x)
142        val = valx
143    if lx:
144        ol.append(lx)
145    return ol
146
147
148def groupby(df: pd.DataFrame, groupby: Union[str, List[str]]) -> pd.core.groupby.DataFrameGroupBy:
149    """Group a DataFrame by specified columns.
150
151    Args:
152        df: The DataFrame to group.
153        groupby: Column name(s) to group by.
154
155    Returns:
156        A grouped DataFrame.
157
158    Raises:
159        Exception: If no valid grouping columns are found.
160    """
161    allcols = set(df.index.names) | {df.index.name} | set(df.columns)
162    if type(groupby) == str:
163        groupby = [groupby]
164    gby = [g for g in groupby if g in allcols]
165    if not gby:
166        raise Exception("No group after filter")
167    return df.groupby(gby)
168
169
170def get_txt(txt: Optional[str], fn: Optional[str]) -> str:
171    """Get text content from a string, file, or URL.
172
173    Args:
174        txt: Text content or None.
175        fn: Filename or URL or None.
176
177    Returns:
178        The text content.
179    """
180    if txt:
181        if txt.startswith("http") or os.path.exists(txt):
182            return get_txt(None, txt)
183
184        return txt
185
186    if fn:
187        if fn.startswith("http"):
188            response = requests.get(fn)
189            return response.text.strip()
190
191        if os.path.exists(fn):
192            with open(fn, encoding='utf-8') as f:
193                return f.read()
194
195    return ""
196
197
198def clean_text(txt: str) -> str:
199    """Clean and normalize text.
200
201    Args:
202        txt: The input text.
203
204    Returns:
205        Cleaned and normalized text.
206    """
207    txt = txt.replace("\r\n", "\n").replace("\r", "\n")
208    txt = ftfy.fix_text(txt)
209    return txt
210
211
212def get_attr_str(attrs: Dict[str, Any], sep: str = ", ", bad_keys: Optional[List[str]] = None) -> str:
213    """Generate a string representation of attributes.
214
215    Args:
216        attrs: Dictionary of attributes.
217        sep: Separator between attribute strings.
218        bad_keys: List of keys to exclude.
219
220    Returns:
221        A string representation of the attributes.
222    """
223    strs = [
224        f"{k}={repr(v)}"
225        for k, v in attrs.items()
226        if v is not None and (not bad_keys or not k in set(bad_keys))
227    ]
228    attrstr = sep.join(strs)
229    return attrstr
230
231
232def safesum(l: List[Union[int, float]]) -> Union[int, float]:
233    """Safely sum a list of numbers, ignoring non-numeric values.
234
235    Args:
236        l: List of numbers to sum.
237
238    Returns:
239        The sum of the numeric values in the list.
240    """
241    l = [x for x in l if type(x) in {int, float, np.float64, np.float32}]
242    return sum(l)
243
244
245def setindex(df: pd.DataFrame, cols: List[str] = []) -> pd.DataFrame:
246    """Set the index of a DataFrame to specified columns.
247
248    Args:
249        df: The input DataFrame.
250        cols: List of column names to set as index.
251
252    Returns:
253        The DataFrame with the new index set.
254    """
255    if not cols:
256        return df
257    cols = [c for c in cols if c in set(df.columns)]
258    return df.set_index(cols) if cols else df
259
260
261
262def get_stress(ipa: str) -> str:
263    """Get the stress level from an IPA string.
264
265    Args:
266        ipa: The IPA string.
267
268    Returns:
269        The stress level ('S', 'P', or 'U').
270    """
271    if not ipa:
272        return ""
273    if ipa[0] == "`":
274        return "S"
275    if ipa[0] == "'":
276        return "P"
277    return "U"
278
279
280def get_initial_whitespace(xstr: str) -> str:
281    """Get the initial whitespace from a string.
282
283    Args:
284        xstr: The input string.
285
286    Returns:
287        The initial whitespace.
288    """
289    o = []
290    for i, x in enumerate(xstr):
291        if x == x.strip():
292            break
293        o.append(x)
294    return "".join(o)
295
296
297def unique(l: List[Any]) -> List[Any]:
298    """Get unique elements from a list while preserving order.
299
300    Args:
301        l: The input list.
302
303    Returns:
304        A list of unique elements.
305    """
306    from ordered_set import OrderedSet
307
308    return list(OrderedSet(l))
309
310
311def hashstr(*inputs: Any, length: int = HASHSTR_LEN) -> str:
312    """Generate a hash string from inputs.
313
314    Args:
315        *inputs: Input values to hash.
316        length: Length of the output hash string.
317
318    Returns:
319        A hash string.
320    """
321    import hashlib
322
323    input_string = str(inputs)
324    sha256_hash = hashlib.sha256(str(input_string).encode()).hexdigest()
325    return sha256_hash[:length]
326
327
328def read_json(fn: str) -> Dict[str, Any]:
329    """Read a JSON file.
330
331    Args:
332        fn: The filename.
333
334    Returns:
335        The parsed JSON data as a dictionary.
336    """
337    if not os.path.exists(fn):
338        return {}
339    with open(fn, encoding='utf-8') as f:
340        return orjson.loads(f.read())
341
342
343def from_json(json_d: Union[str, Dict[str, Any]], **kwargs: Any) -> Any:
344    """Create an object from JSON data.
345
346    Args:
347        json_d: JSON data or filename.
348        **kwargs: Additional keyword arguments.
349
350    Returns:
351        The created object.
352
353    Raises:
354        Exception: If the JSON data doesn't contain a '_class' key.
355    """
356    from .imports import GLOBALS
357
358    if type(json_d) == str:
359        json_d = read_json(json_d)
360    if not "_class" in json_d:
361        pprint(json_d)
362        raise Exception
363    classname = json_d["_class"]
364    classx = GLOBALS[classname]
365    return classx.from_json(json_d, **kwargs)
366
367
368def load(fn: str, **kwargs: Any) -> Any:
369    """Load an object from a JSON file.
370
371    Args:
372        fn: The filename.
373        **kwargs: Additional keyword arguments.
374
375    Returns:
376        The loaded object.
377    """
378    return from_json(fn, **kwargs)
379
380
381def to_json(obj: Any, fn: Optional[str] = None) -> Optional[Dict[str, Any]]:
382    """Convert an object to JSON and optionally save to a file.
383
384    Args:
385        obj: The object to convert.
386        fn: The filename to save to (optional).
387
388    Returns:
389        The JSON data if fn is None, otherwise None.
390    """
391    if hasattr(obj, "to_json"):
392        data = obj.to_json()
393    else:
394        data = obj
395
396    if not fn:
397        return data
398    else:
399        fdir = os.path.dirname(fn)
400        if fdir:
401            os.makedirs(fdir, exist_ok=True)
402        with open(fn, "wb") as of:
403            of.write(
404                orjson.dumps(
405                    data, option=orjson.OPT_INDENT_2 | orjson.OPT_SERIALIZE_NUMPY
406                )
407            )
408
409
410def ensure_dir(fn: str) -> None:
411    """Ensure that the directory for a file exists.
412
413    Args:
414        fn: The filename.
415    """
416    dirname = os.path.dirname(fn)
417    if dirname:
418        os.makedirs(dirname, exist_ok=True)
419
420
421
422def encode_cache(x: Any) -> bytes:
423    """Encode an object for caching.
424
425    Args:
426        x: The object to encode.
427
428    Returns:
429        The encoded object as bytes.
430    """
431    return b64encode(
432        zlib.compress(
433            orjson.dumps(
434                x,
435                option=orjson.OPT_SERIALIZE_NUMPY,
436            )
437        )
438    )
439
440
441def decode_cache(x: bytes) -> Any:
442    """Decode a cached object.
443
444    Args:
445        x: The encoded object.
446
447    Returns:
448        The decoded object.
449    """
450    return orjson.loads(
451        zlib.decompress(
452            b64decode(
453                x,
454            ),
455        ),
456    )
457
458
459def to_html(html: Union[str, Any], as_str: bool = False, **kwargs: Any) -> Union[str, Any]:
460    """Convert an object to HTML.
461
462    Args:
463        html: The object to convert.
464        as_str: Whether to return as a string.
465        **kwargs: Additional keyword arguments.
466
467    Returns:
468        The HTML representation of the object.
469    """
470    if type(html) is not str:
471        if hasattr(html, "to_html"):
472            return html.to_html(as_str=as_str, **kwargs)
473        logger.error(f"what type of data is this? {html}")
474        return
475
476    if as_str:
477        return html
478
479    try:
480        from IPython.display import HTML, Markdown, display
481
482        return HTML(html)
483    except ModuleNotFoundError:
484        return html
485
486
487def enable_caching() -> None:
488    """Enable caching."""
489    global USE_CACHE
490    USE_CACHE = True
491
492
493def caching_is_enabled() -> bool:
494    """Check if caching is enabled.
495
496    Returns:
497        True if caching is enabled, False otherwise.
498    """
499    return USE_CACHE
500
501
502def disable_caching() -> None:
503    """Disable caching."""
504    global USE_CACHE
505    USE_CACHE = False
506
507
508@contextmanager
509def caching_enabled() -> Iterator[None]:
510    """Context manager for temporarily enabling caching."""
511    was_loud = caching_is_enabled()
512    enable_caching()
513    yield
514    if not was_loud:
515        disable_caching()
516
517
518@contextmanager
519def caching_disabled() -> Iterator[None]:
520    """Context manager for temporarily disabling caching."""
521    was_loud = caching_is_enabled()
522    disable_caching()
523    yield
524    if was_loud:
525        enable_caching()
526
527
528@contextmanager
529def logging_disabled() -> Iterator[None]:
530    """Context manager for temporarily disabling logging."""
531    was_quiet = logmap.is_quiet
532    logmap.is_quiet = True
533    yield
534    logmap.is_quiet = was_quiet
535
536
537@contextmanager
538def logging_enabled() -> Iterator[None]:
539    """Context manager for temporarily enabling logging."""
540    was_quiet = logmap.is_quiet
541    logmap.is_quiet = False
542    yield
543    logmap.is_quiet = was_quiet
544
545
546def force_int(x: Any, errors: int = 0) -> int:
547    """Convert the input to an integer.
548
549    Args:
550        x: The input value to be converted to an integer.
551        errors: The value to be returned in case of an error. Defaults to 0.
552
553    Returns:
554        The input value converted to an integer if successful, otherwise the specified error value.
555    """
556    try:
557        return int(x)
558    except (ValueError, TypeError):
559        return errors
560
561def tokenize_agnostic(txt: str) -> List[str]:
562    """Tokenize text in a language-agnostic way.
563
564    Args:
565        txt: The input text.
566
567    Returns:
568        A list of tokens.
569    """
570    return re.findall(r"[\w']+|[.,!?; -—–'\n]", txt)
class SimpleCache:
 5class SimpleCache:
 6    """A simple file-based caching system.
 7
 8    This class provides a dictionary-like interface for caching objects to disk.
 9    It uses a two-level directory structure to organize cached files.
10
11    Attributes:
12        root_dir (str): The root directory for storing cached files.
13    """
14
15    def __init__(self, root_dir: str = PATH_HOME_DATA_CACHE) -> None:
16        """Initialize the SimpleCache.
17
18        Args:
19            root_dir: The root directory for storing cached files.
20        """
21        self.root_dir = root_dir
22        os.makedirs(root_dir, exist_ok=True)
23
24    def _get_file_path(self, key: str) -> str:
25        """Get the file path for a given key.
26
27        Args:
28            key: The cache key.
29
30        Returns:
31            The file path for the given key.
32        """
33        # Use the first 2 characters for the first level directory
34        # and the next 2 characters for the second level directory
35        dir1 = key[:2]
36        dir2 = key[2:4]
37        file_name = key[4:]
38        
39        dir_path = os.path.join(self.root_dir, dir1, dir2)
40        os.makedirs(dir_path, exist_ok=True)
41        
42        return os.path.join(dir_path, file_name)
43
44    def __setitem__(self, key: str, value: Any) -> None:
45        """Set an item in the cache.
46
47        Args:
48            key: The cache key.
49            value: The value to cache.
50        """
51        file_path = self._get_file_path(key)
52        with open(file_path, 'wb') as f:
53            f.write(encode_cache(value))
54
55    def __getitem__(self, key: str) -> Any:
56        """Get an item from the cache.
57
58        Args:
59            key: The cache key.
60
61        Returns:
62            The cached value.
63
64        Raises:
65            KeyError: If the key is not found in the cache.
66        """
67        file_path = self._get_file_path(key)
68        if not os.path.exists(file_path):
69            raise KeyError(key)
70        with open(file_path, 'rb') as f:
71            return decode_cache(f.read())
72
73    def __contains__(self, key: str) -> bool:
74        """Check if a key exists in the cache.
75
76        Args:
77            key: The cache key.
78
79        Returns:
80            True if the key exists, False otherwise.
81        """
82        return os.path.exists(self._get_file_path(key))
83
84    def get(self, key: str, default: Any = None) -> Any:
85        """Get an item from the cache with a default value.
86
87        Args:
88            key: The cache key.
89            default: The default value to return if the key is not found.
90
91        Returns:
92            The cached value or the default value.
93        """
94        try:
95            return self[key]
96        except KeyError:
97            return default

A simple file-based caching system.

This class provides a dictionary-like interface for caching objects to disk. It uses a two-level directory structure to organize cached files.

Attributes:
  • root_dir (str): The root directory for storing cached files.
SimpleCache(root_dir: str = '/Users/ryan/prosodic_data/data/cache')
15    def __init__(self, root_dir: str = PATH_HOME_DATA_CACHE) -> None:
16        """Initialize the SimpleCache.
17
18        Args:
19            root_dir: The root directory for storing cached files.
20        """
21        self.root_dir = root_dir
22        os.makedirs(root_dir, exist_ok=True)

Initialize the SimpleCache.

Arguments:
  • root_dir: The root directory for storing cached files.
root_dir
def get(self, key: str, default: Any = None) -> Any:
84    def get(self, key: str, default: Any = None) -> Any:
85        """Get an item from the cache with a default value.
86
87        Args:
88            key: The cache key.
89            default: The default value to return if the key is not found.
90
91        Returns:
92            The cached value or the default value.
93        """
94        try:
95            return self[key]
96        except KeyError:
97            return default

Get an item from the cache with a default value.

Arguments:
  • key: The cache key.
  • default: The default value to return if the key is not found.
Returns:

The cached value or the default value.

def retry_on_io_error(max_attempts: int = 3, delay: float = 0.1) -> Callable:
 99def retry_on_io_error(max_attempts: int = 3, delay: float = 0.1) -> Callable:
100    """Decorator to retry a function on IOError.
101
102    Args:
103        max_attempts: Maximum number of retry attempts.
104        delay: Delay between retry attempts in seconds.
105
106    Returns:
107        A decorator function.
108    """
109    def decorator(func):
110        @wraps(func)
111        def wrapper(*args, **kwargs):
112            for attempt in range(max_attempts):
113                try:
114                    return func(*args, **kwargs)
115                except IOError as e:
116                    if attempt < max_attempts - 1:
117                        time.sleep(delay)
118                    else:
119                        raise
120        return wrapper
121    return decorator

Decorator to retry a function on IOError.

Arguments:
  • max_attempts: Maximum number of retry attempts.
  • delay: Delay between retry attempts in seconds.
Returns:

A decorator function.

def group_ents(l: List[Any], feat: str) -> List[List[Any]]:
124def group_ents(l: List[Any], feat: str) -> List[List[Any]]:
125    """Group entities based on a common feature.
126
127    Args:
128        l: List of entities to group.
129        feat: The feature to group by.
130
131    Returns:
132        A list of grouped entities.
133    """
134    val = None
135    ol = []
136    lx = []
137    for x in l:
138        valx = getattr(x, feat)
139        if valx is not val and lx:
140            ol.append(lx)
141            lx = []
142        lx.append(x)
143        val = valx
144    if lx:
145        ol.append(lx)
146    return ol

Group entities based on a common feature.

Arguments:
  • l: List of entities to group.
  • feat: The feature to group by.
Returns:

A list of grouped entities.

def groupby( df: pandas.core.frame.DataFrame, groupby: Union[str, List[str]]) -> pandas.core.groupby.generic.DataFrameGroupBy:
149def groupby(df: pd.DataFrame, groupby: Union[str, List[str]]) -> pd.core.groupby.DataFrameGroupBy:
150    """Group a DataFrame by specified columns.
151
152    Args:
153        df: The DataFrame to group.
154        groupby: Column name(s) to group by.
155
156    Returns:
157        A grouped DataFrame.
158
159    Raises:
160        Exception: If no valid grouping columns are found.
161    """
162    allcols = set(df.index.names) | {df.index.name} | set(df.columns)
163    if type(groupby) == str:
164        groupby = [groupby]
165    gby = [g for g in groupby if g in allcols]
166    if not gby:
167        raise Exception("No group after filter")
168    return df.groupby(gby)

Group a DataFrame by specified columns.

Arguments:
  • df: The DataFrame to group.
  • groupby: Column name(s) to group by.
Returns:

A grouped DataFrame.

Raises:
  • Exception: If no valid grouping columns are found.
def get_txt(txt: Optional[str], fn: Optional[str]) -> str:
171def get_txt(txt: Optional[str], fn: Optional[str]) -> str:
172    """Get text content from a string, file, or URL.
173
174    Args:
175        txt: Text content or None.
176        fn: Filename or URL or None.
177
178    Returns:
179        The text content.
180    """
181    if txt:
182        if txt.startswith("http") or os.path.exists(txt):
183            return get_txt(None, txt)
184
185        return txt
186
187    if fn:
188        if fn.startswith("http"):
189            response = requests.get(fn)
190            return response.text.strip()
191
192        if os.path.exists(fn):
193            with open(fn, encoding='utf-8') as f:
194                return f.read()
195
196    return ""

Get text content from a string, file, or URL.

Arguments:
  • txt: Text content or None.
  • fn: Filename or URL or None.
Returns:

The text content.

def clean_text(txt: str) -> str:
199def clean_text(txt: str) -> str:
200    """Clean and normalize text.
201
202    Args:
203        txt: The input text.
204
205    Returns:
206        Cleaned and normalized text.
207    """
208    txt = txt.replace("\r\n", "\n").replace("\r", "\n")
209    txt = ftfy.fix_text(txt)
210    return txt

Clean and normalize text.

Arguments:
  • txt: The input text.
Returns:

Cleaned and normalized text.

def get_attr_str( attrs: Dict[str, Any], sep: str = ', ', bad_keys: Optional[List[str]] = None) -> str:
213def get_attr_str(attrs: Dict[str, Any], sep: str = ", ", bad_keys: Optional[List[str]] = None) -> str:
214    """Generate a string representation of attributes.
215
216    Args:
217        attrs: Dictionary of attributes.
218        sep: Separator between attribute strings.
219        bad_keys: List of keys to exclude.
220
221    Returns:
222        A string representation of the attributes.
223    """
224    strs = [
225        f"{k}={repr(v)}"
226        for k, v in attrs.items()
227        if v is not None and (not bad_keys or not k in set(bad_keys))
228    ]
229    attrstr = sep.join(strs)
230    return attrstr

Generate a string representation of attributes.

Arguments:
  • attrs: Dictionary of attributes.
  • sep: Separator between attribute strings.
  • bad_keys: List of keys to exclude.
Returns:

A string representation of the attributes.

def safesum(l: List[Union[int, float]]) -> Union[int, float]:
233def safesum(l: List[Union[int, float]]) -> Union[int, float]:
234    """Safely sum a list of numbers, ignoring non-numeric values.
235
236    Args:
237        l: List of numbers to sum.
238
239    Returns:
240        The sum of the numeric values in the list.
241    """
242    l = [x for x in l if type(x) in {int, float, np.float64, np.float32}]
243    return sum(l)

Safely sum a list of numbers, ignoring non-numeric values.

Arguments:
  • l: List of numbers to sum.
Returns:

The sum of the numeric values in the list.

def setindex( df: pandas.core.frame.DataFrame, cols: List[str] = []) -> pandas.core.frame.DataFrame:
246def setindex(df: pd.DataFrame, cols: List[str] = []) -> pd.DataFrame:
247    """Set the index of a DataFrame to specified columns.
248
249    Args:
250        df: The input DataFrame.
251        cols: List of column names to set as index.
252
253    Returns:
254        The DataFrame with the new index set.
255    """
256    if not cols:
257        return df
258    cols = [c for c in cols if c in set(df.columns)]
259    return df.set_index(cols) if cols else df

Set the index of a DataFrame to specified columns.

Arguments:
  • df: The input DataFrame.
  • cols: List of column names to set as index.
Returns:

The DataFrame with the new index set.

def get_stress(ipa: str) -> str:
263def get_stress(ipa: str) -> str:
264    """Get the stress level from an IPA string.
265
266    Args:
267        ipa: The IPA string.
268
269    Returns:
270        The stress level ('S', 'P', or 'U').
271    """
272    if not ipa:
273        return ""
274    if ipa[0] == "`":
275        return "S"
276    if ipa[0] == "'":
277        return "P"
278    return "U"

Get the stress level from an IPA string.

Arguments:
  • ipa: The IPA string.
Returns:

The stress level ('S', 'P', or 'U').

def get_initial_whitespace(xstr: str) -> str:
281def get_initial_whitespace(xstr: str) -> str:
282    """Get the initial whitespace from a string.
283
284    Args:
285        xstr: The input string.
286
287    Returns:
288        The initial whitespace.
289    """
290    o = []
291    for i, x in enumerate(xstr):
292        if x == x.strip():
293            break
294        o.append(x)
295    return "".join(o)

Get the initial whitespace from a string.

Arguments:
  • xstr: The input string.
Returns:

The initial whitespace.

def unique(l: List[Any]) -> List[Any]:
298def unique(l: List[Any]) -> List[Any]:
299    """Get unique elements from a list while preserving order.
300
301    Args:
302        l: The input list.
303
304    Returns:
305        A list of unique elements.
306    """
307    from ordered_set import OrderedSet
308
309    return list(OrderedSet(l))

Get unique elements from a list while preserving order.

Arguments:
  • l: The input list.
Returns:

A list of unique elements.

def hashstr(*inputs: Any, length: int = None) -> str:
312def hashstr(*inputs: Any, length: int = HASHSTR_LEN) -> str:
313    """Generate a hash string from inputs.
314
315    Args:
316        *inputs: Input values to hash.
317        length: Length of the output hash string.
318
319    Returns:
320        A hash string.
321    """
322    import hashlib
323
324    input_string = str(inputs)
325    sha256_hash = hashlib.sha256(str(input_string).encode()).hexdigest()
326    return sha256_hash[:length]

Generate a hash string from inputs.

Arguments:
  • *inputs: Input values to hash.
  • length: Length of the output hash string.
Returns:

A hash string.

def read_json(fn: str) -> Dict[str, Any]:
329def read_json(fn: str) -> Dict[str, Any]:
330    """Read a JSON file.
331
332    Args:
333        fn: The filename.
334
335    Returns:
336        The parsed JSON data as a dictionary.
337    """
338    if not os.path.exists(fn):
339        return {}
340    with open(fn, encoding='utf-8') as f:
341        return orjson.loads(f.read())

Read a JSON file.

Arguments:
  • fn: The filename.
Returns:

The parsed JSON data as a dictionary.

def from_json(json_d: Union[str, Dict[str, Any]], **kwargs: Any) -> Any:
344def from_json(json_d: Union[str, Dict[str, Any]], **kwargs: Any) -> Any:
345    """Create an object from JSON data.
346
347    Args:
348        json_d: JSON data or filename.
349        **kwargs: Additional keyword arguments.
350
351    Returns:
352        The created object.
353
354    Raises:
355        Exception: If the JSON data doesn't contain a '_class' key.
356    """
357    from .imports import GLOBALS
358
359    if type(json_d) == str:
360        json_d = read_json(json_d)
361    if not "_class" in json_d:
362        pprint(json_d)
363        raise Exception
364    classname = json_d["_class"]
365    classx = GLOBALS[classname]
366    return classx.from_json(json_d, **kwargs)

Create an object from JSON data.

Arguments:
  • json_d: JSON data or filename.
  • **kwargs: Additional keyword arguments.
Returns:

The created object.

Raises:
  • Exception: If the JSON data doesn't contain a '_class' key.
def load(fn: str, **kwargs: Any) -> Any:
369def load(fn: str, **kwargs: Any) -> Any:
370    """Load an object from a JSON file.
371
372    Args:
373        fn: The filename.
374        **kwargs: Additional keyword arguments.
375
376    Returns:
377        The loaded object.
378    """
379    return from_json(fn, **kwargs)

Load an object from a JSON file.

Arguments:
  • fn: The filename.
  • **kwargs: Additional keyword arguments.
Returns:

The loaded object.

def to_json(obj: Any, fn: Optional[str] = None) -> Optional[Dict[str, Any]]:
382def to_json(obj: Any, fn: Optional[str] = None) -> Optional[Dict[str, Any]]:
383    """Convert an object to JSON and optionally save to a file.
384
385    Args:
386        obj: The object to convert.
387        fn: The filename to save to (optional).
388
389    Returns:
390        The JSON data if fn is None, otherwise None.
391    """
392    if hasattr(obj, "to_json"):
393        data = obj.to_json()
394    else:
395        data = obj
396
397    if not fn:
398        return data
399    else:
400        fdir = os.path.dirname(fn)
401        if fdir:
402            os.makedirs(fdir, exist_ok=True)
403        with open(fn, "wb") as of:
404            of.write(
405                orjson.dumps(
406                    data, option=orjson.OPT_INDENT_2 | orjson.OPT_SERIALIZE_NUMPY
407                )
408            )

Convert an object to JSON and optionally save to a file.

Arguments:
  • obj: The object to convert.
  • fn: The filename to save to (optional).
Returns:

The JSON data if fn is None, otherwise None.

def ensure_dir(fn: str) -> None:
411def ensure_dir(fn: str) -> None:
412    """Ensure that the directory for a file exists.
413
414    Args:
415        fn: The filename.
416    """
417    dirname = os.path.dirname(fn)
418    if dirname:
419        os.makedirs(dirname, exist_ok=True)

Ensure that the directory for a file exists.

Arguments:
  • fn: The filename.
def encode_cache(x: Any) -> bytes:
423def encode_cache(x: Any) -> bytes:
424    """Encode an object for caching.
425
426    Args:
427        x: The object to encode.
428
429    Returns:
430        The encoded object as bytes.
431    """
432    return b64encode(
433        zlib.compress(
434            orjson.dumps(
435                x,
436                option=orjson.OPT_SERIALIZE_NUMPY,
437            )
438        )
439    )

Encode an object for caching.

Arguments:
  • x: The object to encode.
Returns:

The encoded object as bytes.

def decode_cache(x: bytes) -> Any:
442def decode_cache(x: bytes) -> Any:
443    """Decode a cached object.
444
445    Args:
446        x: The encoded object.
447
448    Returns:
449        The decoded object.
450    """
451    return orjson.loads(
452        zlib.decompress(
453            b64decode(
454                x,
455            ),
456        ),
457    )

Decode a cached object.

Arguments:
  • x: The encoded object.
Returns:

The decoded object.

def to_html( html: Union[str, Any], as_str: bool = False, **kwargs: Any) -> Union[str, Any]:
460def to_html(html: Union[str, Any], as_str: bool = False, **kwargs: Any) -> Union[str, Any]:
461    """Convert an object to HTML.
462
463    Args:
464        html: The object to convert.
465        as_str: Whether to return as a string.
466        **kwargs: Additional keyword arguments.
467
468    Returns:
469        The HTML representation of the object.
470    """
471    if type(html) is not str:
472        if hasattr(html, "to_html"):
473            return html.to_html(as_str=as_str, **kwargs)
474        logger.error(f"what type of data is this? {html}")
475        return
476
477    if as_str:
478        return html
479
480    try:
481        from IPython.display import HTML, Markdown, display
482
483        return HTML(html)
484    except ModuleNotFoundError:
485        return html

Convert an object to HTML.

Arguments:
  • html: The object to convert.
  • as_str: Whether to return as a string.
  • **kwargs: Additional keyword arguments.
Returns:

The HTML representation of the object.

def enable_caching() -> None:
488def enable_caching() -> None:
489    """Enable caching."""
490    global USE_CACHE
491    USE_CACHE = True

Enable caching.

def caching_is_enabled() -> bool:
494def caching_is_enabled() -> bool:
495    """Check if caching is enabled.
496
497    Returns:
498        True if caching is enabled, False otherwise.
499    """
500    return USE_CACHE

Check if caching is enabled.

Returns:

True if caching is enabled, False otherwise.

def disable_caching() -> None:
503def disable_caching() -> None:
504    """Disable caching."""
505    global USE_CACHE
506    USE_CACHE = False

Disable caching.

@contextmanager
def caching_enabled() -> Iterator[NoneType]:
509@contextmanager
510def caching_enabled() -> Iterator[None]:
511    """Context manager for temporarily enabling caching."""
512    was_loud = caching_is_enabled()
513    enable_caching()
514    yield
515    if not was_loud:
516        disable_caching()

Context manager for temporarily enabling caching.

@contextmanager
def caching_disabled() -> Iterator[NoneType]:
519@contextmanager
520def caching_disabled() -> Iterator[None]:
521    """Context manager for temporarily disabling caching."""
522    was_loud = caching_is_enabled()
523    disable_caching()
524    yield
525    if was_loud:
526        enable_caching()

Context manager for temporarily disabling caching.

@contextmanager
def logging_disabled() -> Iterator[NoneType]:
529@contextmanager
530def logging_disabled() -> Iterator[None]:
531    """Context manager for temporarily disabling logging."""
532    was_quiet = logmap.is_quiet
533    logmap.is_quiet = True
534    yield
535    logmap.is_quiet = was_quiet

Context manager for temporarily disabling logging.

@contextmanager
def logging_enabled() -> Iterator[NoneType]:
538@contextmanager
539def logging_enabled() -> Iterator[None]:
540    """Context manager for temporarily enabling logging."""
541    was_quiet = logmap.is_quiet
542    logmap.is_quiet = False
543    yield
544    logmap.is_quiet = was_quiet

Context manager for temporarily enabling logging.

def force_int(x: Any, errors: int = 0) -> int:
547def force_int(x: Any, errors: int = 0) -> int:
548    """Convert the input to an integer.
549
550    Args:
551        x: The input value to be converted to an integer.
552        errors: The value to be returned in case of an error. Defaults to 0.
553
554    Returns:
555        The input value converted to an integer if successful, otherwise the specified error value.
556    """
557    try:
558        return int(x)
559    except (ValueError, TypeError):
560        return errors

Convert the input to an integer.

Arguments:
  • x: The input value to be converted to an integer.
  • errors: The value to be returned in case of an error. Defaults to 0.
Returns:

The input value converted to an integer if successful, otherwise the specified error value.

def tokenize_agnostic(txt: str) -> List[str]:
562def tokenize_agnostic(txt: str) -> List[str]:
563    """Tokenize text in a language-agnostic way.
564
565    Args:
566        txt: The input text.
567
568    Returns:
569        A list of tokens.
570    """
571    return re.findall(r"[\w']+|[.,!?; -—–'\n]", txt)

Tokenize text in a language-agnostic way.

Arguments:
  • txt: The input text.
Returns:

A list of tokens.