1 2
from functools import partial
2 2
from typing import Union
3

4 2
import numpy as np
5 2
import pandas as pd
6 2
from dask import dataframe as dd
7 2
from dask.distributed import Client
8

9 2
from .core import BaseLFApplier, _FunctionCaller
10 2
from .pandas import apply_lfs_to_data_point, rows_to_triplets
11

12 2
Scheduler = Union[str, Client]
13

14

15 2
class DaskLFApplier(BaseLFApplier):
16
    """LF applier for a Dask DataFrame.
17

18
    Dask DataFrames consist of partitions, each being a Pandas DataFrame.
19
    This allows for efficient parallel computation over DataFrame rows.
20
    For more information, see https://docs.dask.org/en/stable/dataframe.html
21
    """
22

23 2
    def apply(
24
        self,
25
        df: dd.DataFrame,
26
        scheduler: Scheduler = "processes",
27
        fault_tolerant: bool = False,
28
    ) -> np.ndarray:
29
        """Label Dask DataFrame of data points with LFs.
30

31
        Parameters
32
        ----------
33
        df
34
            Dask DataFrame containing data points to be labeled by LFs
35
        scheduler
36
            A Dask scheduling configuration: either a string option or
37
            a ``Client``. For more information, see
38
            https://docs.dask.org/en/stable/scheduling.html#
39
        fault_tolerant
40
            Output ``-1`` if LF execution fails?
41

42
        Returns
43
        -------
44
        np.ndarray
45
            Matrix of labels emitted by LFs
46
        """
47 2
        f_caller = _FunctionCaller(fault_tolerant)
48 2
        apply_fn = partial(apply_lfs_to_data_point, lfs=self._lfs, f_caller=f_caller)
49 2
        map_fn = df.map_partitions(lambda p_df: p_df.apply(apply_fn, axis=1))
50 2
        labels = map_fn.compute(scheduler=scheduler)
51 2
        labels_with_index = rows_to_triplets(labels)
52 2
        return self._numpy_from_row_data(labels_with_index)
53

54

55 2
class PandasParallelLFApplier(DaskLFApplier):
56
    """Parallel LF applier for a Pandas DataFrame.
57

58
    Creates a Dask DataFrame from a Pandas DataFrame, then uses
59
    ``DaskLFApplier`` to label data in parallel. See ``DaskLFApplier``.
60
    """
61

62 2
    def apply(  # type: ignore
63
        self,
64
        df: pd.DataFrame,
65
        n_parallel: int = 2,
66
        scheduler: Scheduler = "processes",
67
        fault_tolerant: bool = False,
68
    ) -> np.ndarray:
69
        """Label Pandas DataFrame of data points with LFs in parallel using Dask.
70

71
        Parameters
72
        ----------
73
        df
74
            Pandas DataFrame containing data points to be labeled by LFs
75
        n_parallel
76
            Parallelism level for LF application. Corresponds to ``npartitions``
77
            in constructed Dask DataFrame. For ``scheduler="processes"``, number
78
            of processes launched. Recommended to be no more than the number
79
            of cores on the running machine.
80
        scheduler
81
            A Dask scheduling configuration: either a string option or
82
            a ``Client``. For more information, see
83
            https://docs.dask.org/en/stable/scheduling.html#
84
        fault_tolerant
85
            Output ``-1`` if LF execution fails?
86

87
        Returns
88
        -------
89
        np.ndarray
90
            Matrix of labels emitted by LFs
91
        """
92 2
        if n_parallel < 2:
93 2
            raise ValueError(
94
                "n_parallel should be >= 2. "
95
                "For single process Pandas, use PandasLFApplier."
96
            )
97 2
        df = dd.from_pandas(df, npartitions=n_parallel)
98 2
        return super().apply(df, scheduler=scheduler, fault_tolerant=fault_tolerant)

Read our documentation on viewing source code .

Loading