fabric / fabric
1 6
try:
2 6
    from invoke.vendor.six.moves.queue import Queue
3 0
except ImportError:
4 0
    from six.moves.queue import Queue
5

6 6
from invoke.util import ExceptionHandlingThread
7

8 6
from .connection import Connection
9 6
from .exceptions import GroupException
10

11

12 6
class Group(list):
13
    """
14
    A collection of `.Connection` objects whose API operates on its contents.
15

16
    .. warning::
17
        **This is a partially abstract class**; you need to use one of its
18
        concrete subclasses (such as `.SerialGroup` or `.ThreadingGroup`) or
19
        you'll get ``NotImplementedError`` on most of the methods.
20

21
    Most methods in this class mirror those of `.Connection`, taking the same
22
    arguments; however their return values and exception-raising behavior
23
    differs:
24

25
    - Return values are dict-like objects (`.GroupResult`) mapping
26
      `.Connection` objects to the return value for the respective connections:
27
      `.Group.run` returns a map of `.Connection` to `.runners.Result`,
28
      `.Group.get` returns a map of `.Connection` to `.transfer.Result`, etc.
29
    - If any connections encountered exceptions, a `.GroupException` is raised,
30
      which is a thin wrapper around what would otherwise have been the
31
      `.GroupResult` returned; within that wrapped `.GroupResult`, the
32
      excepting connections map to the exception that was raised, in place of a
33
      ``Result`` (as no ``Result`` was obtained.) Any non-excepting connections
34
      will have a ``Result`` value, as normal.
35

36
    For example, when no exceptions occur, a session might look like this::
37

38
        >>> group = SerialGroup('host1', 'host2')
39
        >>> group.run("this is fine")
40
        {
41
            <Connection host='host1'>: <Result cmd='this is fine' exited=0>,
42
            <Connection host='host2'>: <Result cmd='this is fine' exited=0>,
43
        }
44

45
    With exceptions (anywhere from 1 to "all of them"), it looks like so; note
46
    the different exception classes, e.g. `~invoke.exceptions.UnexpectedExit`
47
    for a completed session whose command exited poorly, versus
48
    `socket.gaierror` for a host that had DNS problems::
49

50
        >>> group = SerialGroup('host1', 'host2', 'notahost')
51
        >>> group.run("will it blend?")
52
        {
53
            <Connection host='host1'>: <Result cmd='will it blend?' exited=0>,
54
            <Connection host='host2'>: <UnexpectedExit: cmd='...' exited=1>,
55
            <Connection host='notahost'>: gaierror(...),
56
        }
57

58
    As with `.Connection`, `.Group` objects may be used as context managers,
59
    which will automatically `.close` the object on block exit.
60

61
    .. versionadded:: 2.0
62
    .. versionchanged:: 2.4
63
        Added context manager behavior.
64
    """
65

66 6
    def __init__(self, *hosts, **kwargs):
67
        """
68
        Create a group of connections from one or more shorthand host strings.
69

70
        See `.Connection` for details on the format of these strings - they
71
        will be used as the first positional argument of `.Connection`
72
        constructors.
73

74
        Any keyword arguments given will be forwarded directly to those
75
        `.Connection` constructors as well. For example, to get a serially
76
        executing group object that connects to ``admin@host1``,
77
        ``admin@host2`` and ``admin@host3``, and forwards your SSH agent too::
78

79
            group = SerialGroup(
80
                "host1", "host2", "host3", user="admin", forward_agent=True,
81
            )
82

83
        .. versionchanged:: 2.3
84
            Added ``**kwargs`` (was previously only ``*hosts``).
85
        """
86
        # TODO: #563, #388 (could be here or higher up in Program area)
87 6
        self.extend([Connection(host, **kwargs) for host in hosts])
88

89 6
    @classmethod
90 2
    def from_connections(cls, connections):
91
        """
92
        Alternate constructor accepting `.Connection` objects.
93

94
        .. versionadded:: 2.0
95
        """
96
        # TODO: *args here too; or maybe just fold into __init__ and type
97
        # check?
98 6
        group = cls()
99 6
        group.extend(connections)
100 6
        return group
101

102 6
    def run(self, *args, **kwargs):
103
        """
104
        Executes `.Connection.run` on all member `Connections <.Connection>`.
105

106
        :returns: a `.GroupResult`.
107

108
        .. versionadded:: 2.0
109
        """
110
        # TODO: probably best to suck it up & match actual run() sig?
111
        # TODO: how to change method of execution across contents? subclass,
112
        # kwargs, additional methods, inject an executor? Doing subclass for
113
        # now, but not 100% sure it's the best route.
114
        # TODO: also need way to deal with duplicate connections (see THOUGHTS)
115
        # TODO: and errors - probably FailureSet? How to handle other,
116
        # regular, non Failure, exceptions though? Still need an aggregate
117
        # exception type either way, whether it is FailureSet or what...
118
        # TODO: OTOH, users may well want to be able to operate on the hosts
119
        # that did not fail (esp if failure % is low) so we really _do_ want
120
        # something like a result object mixing success and failure, or maybe a
121
        # golang style two-tuple of successes and failures?
122
        # TODO: or keep going w/ a "return or except", but the object is
123
        # largely similar (if not identical) in both situations, with the
124
        # exception just being the signal that Shit Broke?
125 6
        raise NotImplementedError
126

127
    # TODO: how to handle sudo? Probably just an inner worker method that takes
128
    # the method name to actually call (run, sudo, etc)?
129

130
    # TODO: this all needs to mesh well with similar strategies applied to
131
    # entire tasks - so that may still end up factored out into Executors or
132
    # something lower level than both those and these?
133

134
    # TODO: local? Invoke wants ability to do that on its own though, which
135
    # would be distinct from Group. (May want to switch Group to use that,
136
    # though, whatever it ends up being?)
137

138 6
    def get(self, *args, **kwargs):
139
        """
140
        Executes `.Connection.get` on all member `Connections <.Connection>`.
141

142
        :returns: a `.GroupResult`.
143

144
        .. versionadded:: 2.0
145
        """
146
        # TODO: probably best to suck it up & match actual get() sig?
147
        # TODO: actually implement on subclasses
148 0
        raise NotImplementedError
149

150 6
    def close(self):
151
        """
152
        Executes `.Connection.close` on all member `Connections <.Connection>`.
153

154
        .. versionadded:: 2.4
155
        """
156 6
        for cxn in self:
157 6
            cxn.close()
158

159 6
    def __enter__(self):
160 6
        return self
161

162 6
    def __exit__(self, *exc):
163 6
        self.close()
164

165

166 6
class SerialGroup(Group):
167
    """
168
    Subclass of `.Group` which executes in simple, serial fashion.
169

170
    .. versionadded:: 2.0
171
    """
172

173 6
    def run(self, *args, **kwargs):
174 6
        results = GroupResult()
175 6
        excepted = False
176 6
        for cxn in self:
177 6
            try:
178 6
                results[cxn] = cxn.run(*args, **kwargs)
179 6
            except Exception as e:
180 6
                results[cxn] = e
181 6
                excepted = True
182 6
        if excepted:
183 6
            raise GroupException(results)
184 6
        return results
185

186

187 6
def thread_worker(cxn, queue, args, kwargs):
188 6
    result = cxn.run(*args, **kwargs)
189
    # TODO: namedtuple or attrs object?
190 6
    queue.put((cxn, result))
191

192

193 6
class ThreadingGroup(Group):
194
    """
195
    Subclass of `.Group` which uses threading to execute concurrently.
196

197
    .. versionadded:: 2.0
198
    """
199

200 6
    def run(self, *args, **kwargs):
201 6
        results = GroupResult()
202 6
        queue = Queue()
203 6
        threads = []
204 6
        for cxn in self:
205 6
            my_kwargs = dict(cxn=cxn, queue=queue, args=args, kwargs=kwargs)
206 6
            thread = ExceptionHandlingThread(
207
                target=thread_worker, kwargs=my_kwargs
208
            )
209 6
            threads.append(thread)
210 6
        for thread in threads:
211 6
            thread.start()
212 6
        for thread in threads:
213
            # TODO: configurable join timeout
214
            # TODO: (in sudo's version) configurability around interactive
215
            # prompting resulting in an exception instead, as in v1
216 6
            thread.join()
217
        # Get non-exception results from queue
218 6
        while not queue.empty():
219
            # TODO: io-sleep? shouldn't matter if all threads are now joined
220 6
            cxn, result = queue.get(block=False)
221
            # TODO: outstanding musings about how exactly aggregate results
222
            # ought to ideally operate...heterogenous obj like this, multiple
223
            # objs, ??
224 6
            results[cxn] = result
225
        # Get exceptions from the threads themselves.
226
        # TODO: in a non-thread setup, this would differ, e.g.:
227
        # - a queue if using multiprocessing
228
        # - some other state-passing mechanism if using e.g. coroutines
229
        # - ???
230 6
        excepted = False
231 6
        for thread in threads:
232 6
            wrapper = thread.exception()
233 6
            if wrapper is not None:
234
                # Outer kwargs is Thread instantiation kwargs, inner is kwargs
235
                # passed to thread target/body.
236 6
                cxn = wrapper.kwargs["kwargs"]["cxn"]
237 6
                results[cxn] = wrapper.value
238 6
                excepted = True
239 6
        if excepted:
240 6
            raise GroupException(results)
241 6
        return results
242

243

244 6
class GroupResult(dict):
245
    """
246
    Collection of results and/or exceptions arising from `.Group` methods.
247

248
    Acts like a dict, but adds a couple convenience methods, to wit:
249

250
    - Keys are the individual `.Connection` objects from within the `.Group`.
251
    - Values are either return values / results from the called method (e.g.
252
      `.runners.Result` objects), *or* an exception object, if one prevented
253
      the method from returning.
254
    - Subclasses `dict`, so has all dict methods.
255
    - Has `.succeeded` and `.failed` attributes containing sub-dicts limited to
256
      just those key/value pairs that succeeded or encountered exceptions,
257
      respectively.
258

259
      - Of note, these attributes allow high level logic, e.g. ``if
260
        mygroup.run('command').failed`` and so forth.
261

262
    .. versionadded:: 2.0
263
    """
264

265 6
    def __init__(self, *args, **kwargs):
266 6
        super(dict, self).__init__(*args, **kwargs)
267 6
        self._successes = {}
268 6
        self._failures = {}
269

270 6
    def _bifurcate(self):
271
        # Short-circuit to avoid reprocessing every access.
272 6
        if self._successes or self._failures:
273 6
            return
274
        # TODO: if we ever expect .succeeded/.failed to be useful before a
275
        # GroupResult is fully initialized, this needs to become smarter.
276 6
        for key, value in self.items():
277 6
            if isinstance(value, BaseException):
278 6
                self._failures[key] = value
279
            else:
280 6
                self._successes[key] = value
281

282 6
    @property
283 2
    def succeeded(self):
284
        """
285
        A sub-dict containing only successful results.
286

287
        .. versionadded:: 2.0
288
        """
289 6
        self._bifurcate()
290 6
        return self._successes
291

292 6
    @property
293 2
    def failed(self):
294
        """
295
        A sub-dict containing only failed results.
296

297
        .. versionadded:: 2.0
298
        """
299 6
        self._bifurcate()
300 6
        return self._failures

Read our documentation on viewing source code .

Loading