1 2
from functools import partial
2 2
from typing import List, Tuple, Union
3

4 2
import numpy as np
5 2
import pandas as pd
6 2
from tqdm import tqdm
7

8 2
from snorkel.labeling.lf import LabelingFunction
9 2
from snorkel.types import DataPoint
10

11 2
from .core import ApplierMetadata, BaseLFApplier, RowData, _FunctionCaller
12

13 2
PandasRowData = List[Tuple[int, int]]
14

15

16 2
def apply_lfs_to_data_point(
17
    x: DataPoint, lfs: List[LabelingFunction], f_caller: _FunctionCaller
18
) -> PandasRowData:
19
    """Label a single data point with a set of LFs.
20

21
    Parameters
22
    ----------
23
    x
24
        Data point to label
25
    lfs
26
        Set of LFs to label ``x`` with
27
    f_caller
28
        A ``_FunctionCaller`` to record failed LF executions
29

30
    Returns
31
    -------
32
    RowData
33
        A list of (LF index, label) tuples
34
    """
35 2
    labels = []
36 2
    for j, lf in enumerate(lfs):
37 2
        y = f_caller(lf, x)
38 2
        if y >= 0:
39 2
            labels.append((j, y))
40 2
    return labels
41

42

43 2
def rows_to_triplets(labels: List[PandasRowData]) -> List[RowData]:
44
    """Convert list of list sparse matrix representation to list of triplets."""
45 2
    return [
46
        [(index, j, y) for j, y in row_labels]
47
        for index, row_labels in enumerate(labels)
48
    ]
49

50

51 2
class PandasLFApplier(BaseLFApplier):
52
    """LF applier for a Pandas DataFrame.
53

54
    Data points are stored as ``Series`` in a DataFrame. The LFs
55
    are executed via a ``pandas.DataFrame.apply`` call, which
56
    is single-process and can be slow for large DataFrames.
57
    For large datasets, consider ``DaskLFApplier`` or ``SparkLFApplier``.
58

59
    Parameters
60
    ----------
61
    lfs
62
        LFs that this applier executes on examples
63

64
    Example
65
    -------
66
    >>> from snorkel.labeling import labeling_function
67
    >>> @labeling_function()
68
    ... def is_big_num(x):
69
    ...     return 1 if x.num > 42 else 0
70
    >>> applier = PandasLFApplier([is_big_num])
71
    >>> applier.apply(pd.DataFrame(dict(num=[10, 100], text=["hello", "hi"])))
72
    array([[0], [1]])
73
    """
74

75 2
    def apply(
76
        self,
77
        df: pd.DataFrame,
78
        progress_bar: bool = True,
79
        fault_tolerant: bool = False,
80
        return_meta: bool = False,
81
    ) -> Union[np.ndarray, Tuple[np.ndarray, ApplierMetadata]]:
82
        """Label Pandas DataFrame of data points with LFs.
83

84
        Parameters
85
        ----------
86
        df
87
            Pandas DataFrame containing data points to be labeled by LFs
88
        progress_bar
89
            Display a progress bar?
90
        fault_tolerant
91
            Output ``-1`` if LF execution fails?
92
        return_meta
93
            Return metadata from apply call?
94

95
        Returns
96
        -------
97
        np.ndarray
98
            Matrix of labels emitted by LFs
99
        ApplierMetadata
100
            Metadata, such as fault counts, for the apply call
101
        """
102 2
        f_caller = _FunctionCaller(fault_tolerant)
103 2
        apply_fn = partial(apply_lfs_to_data_point, lfs=self._lfs, f_caller=f_caller)
104 2
        call_fn = df.apply
105 2
        if progress_bar:
106 2
            tqdm.pandas()
107 2
            call_fn = df.progress_apply
108 2
        labels = call_fn(apply_fn, axis=1)
109 2
        labels_with_index = rows_to_triplets(labels)
110 2
        L = self._numpy_from_row_data(labels_with_index)
111 2
        if return_meta:
112 2
            return L, ApplierMetadata(f_caller.fault_counts)
113 2
        return L

Read our documentation on viewing source code .

Loading