fabric / fabric
1 1
try:
2 1
    from invoke.vendor.six.moves.queue import Queue
3 0
except ImportError:
4 0
    from six.moves.queue import Queue
5

6 1
from invoke.util import ExceptionHandlingThread
7

8 1
from .connection import Connection
9 1
from .exceptions import GroupException
10

11

12 1
class Group(list):
13
    """
14
    A collection of `.Connection` objects whose API operates on its contents.
15

16
    .. warning::
17
        **This is a partially abstract class**; you need to use one of its
18
        concrete subclasses (such as `.SerialGroup` or `.ThreadingGroup`) or
19
        you'll get ``NotImplementedError`` on most of the methods.
20

21
    Most methods in this class mirror those of `.Connection`, taking the same
22
    arguments; however their return values and exception-raising behavior
23
    differs:
24

25
    - Return values are dict-like objects (`.GroupResult`) mapping
26
      `.Connection` objects to the return value for the respective connections:
27
      `.Group.run` returns a map of `.Connection` to `.runners.Result`,
28
      `.Group.get` returns a map of `.Connection` to `.transfer.Result`, etc.
29
    - If any connections encountered exceptions, a `.GroupException` is raised,
30
      which is a thin wrapper around what would otherwise have been the
31
      `.GroupResult` returned; within that wrapped `.GroupResult`, the
32
      excepting connections map to the exception that was raised, in place of a
33
      ``Result`` (as no ``Result`` was obtained.) Any non-excepting connections
34
      will have a ``Result`` value, as normal.
35

36
    For example, when no exceptions occur, a session might look like this::
37

38
        >>> group = SerialGroup('host1', 'host2')
39
        >>> group.run("this is fine")
40
        {
41
            <Connection host='host1'>: <Result cmd='this is fine' exited=0>,
42
            <Connection host='host2'>: <Result cmd='this is fine' exited=0>,
43
        }
44

45
    With exceptions (anywhere from 1 to "all of them"), it looks like so; note
46
    the different exception classes, e.g. `~invoke.exceptions.UnexpectedExit`
47
    for a completed session whose command exited poorly, versus
48
    `socket.gaierror` for a host that had DNS problems::
49

50
        >>> group = SerialGroup('host1', 'host2', 'notahost')
51
        >>> group.run("will it blend?")
52
        {
53
            <Connection host='host1'>: <Result cmd='will it blend?' exited=0>,
54
            <Connection host='host2'>: <UnexpectedExit: cmd='...' exited=1>,
55
            <Connection host='notahost'>: gaierror(...),
56
        }
57

58
    As with `.Connection`, `.Group` objects may be used as context managers,
59
    which will automatically `.close` the object on block exit.
60

61
    .. versionadded:: 2.0
62
    .. versionchanged:: 2.4
63
        Added context manager behavior.
64
    """
65

66 1
    def __init__(self, *hosts, **kwargs):
67
        """
68
        Create a group of connections from one or more shorthand host strings.
69

70
        See `.Connection` for details on the format of these strings - they
71
        will be used as the first positional argument of `.Connection`
72
        constructors.
73

74
        Any keyword arguments given will be forwarded directly to those
75
        `.Connection` constructors as well. For example, to get a serially
76
        executing group object that connects to ``admin@host1``,
77
        ``admin@host2`` and ``admin@host3``, and forwards your SSH agent too::
78

79
            group = SerialGroup(
80
                "host1", "host2", "host3", user="admin", forward_agent=True,
81
            )
82

83
        .. versionchanged:: 2.3
84
            Added ``**kwargs`` (was previously only ``*hosts``).
85
        """
86
        # TODO: #563, #388 (could be here or higher up in Program area)
87 1
        self.extend([Connection(host, **kwargs) for host in hosts])
88

89 1
    @classmethod
90
    def from_connections(cls, connections):
91
        """
92
        Alternate constructor accepting `.Connection` objects.
93

94
        .. versionadded:: 2.0
95
        """
96
        # TODO: *args here too; or maybe just fold into __init__ and type
97
        # check?
98 1
        group = cls()
99 1
        group.extend(connections)
100 1
        return group
101

102 1
    def run(self, *args, **kwargs):
103
        """
104
        Executes `.Connection.run` on all member `Connections <.Connection>`.
105

106
        :returns: a `.GroupResult`.
107

108
        .. versionadded:: 2.0
109
        """
110
        # TODO: probably best to suck it up & match actual run() sig?
111
        # TODO: how to change method of execution across contents? subclass,
112
        # kwargs, additional methods, inject an executor? Doing subclass for
113
        # now, but not 100% sure it's the best route.
114
        # TODO: also need way to deal with duplicate connections (see THOUGHTS)
115
        # TODO: and errors - probably FailureSet? How to handle other,
116
        # regular, non Failure, exceptions though? Still need an aggregate
117
        # exception type either way, whether it is FailureSet or what...
118
        # TODO: OTOH, users may well want to be able to operate on the hosts
119
        # that did not fail (esp if failure % is low) so we really _do_ want
120
        # something like a result object mixing success and failure, or maybe a
121
        # golang style two-tuple of successes and failures?
122
        # TODO: or keep going w/ a "return or except", but the object is
123
        # largely similar (if not identical) in both situations, with the
124
        # exception just being the signal that Shit Broke?
125 1
        raise NotImplementedError
126

127
    # TODO: how to handle sudo? Probably just an inner worker method that takes
128
    # the method name to actually call (run, sudo, etc)?
129

130
    # TODO: this all needs to mesh well with similar strategies applied to
131
    # entire tasks - so that may still end up factored out into Executors or
132
    # something lower level than both those and these?
133

134
    # TODO: local? Invoke wants ability to do that on its own though, which
135
    # would be distinct from Group. (May want to switch Group to use that,
136
    # though, whatever it ends up being?)
137

138 1
    def get(self, *args, **kwargs):
139
        """
140
        Executes `.Connection.get` on all member `Connections <.Connection>`.
141

142
        :returns: a `.GroupResult`.
143

144
        .. versionadded:: 2.0
145
        """
146
        # TODO: probably best to suck it up & match actual get() sig?
147
        # TODO: actually implement on subclasses
148 0
        raise NotImplementedError
149

150 1
    def close(self):
151
        """
152
        Executes `.Connection.close` on all member `Connections <.Connection>`.
153

154
        .. versionadded:: 2.4
155
        """
156 1
        for cxn in self:
157 1
            cxn.close()
158

159 1
    def __enter__(self):
160 1
        return self
161

162 1
    def __exit__(self, *exc):
163 1
        self.close()
164

165

166 1
class SerialGroup(Group):
167
    """
168
    Subclass of `.Group` which executes in simple, serial fashion.
169

170
    .. versionadded:: 2.0
171
    """
172

173 1
    def run(self, *args, **kwargs):
174 1
        results = GroupResult()
175 1
        excepted = False
176 1
        for cxn in self:
177 1
            try:
178 1
                results[cxn] = cxn.run(*args, **kwargs)
179 1
            except Exception as e:
180 1
                results[cxn] = e
181 1
                excepted = True
182 1
        if excepted:
183 1
            raise GroupException(results)
184 1
        return results
185

186

187 1
def thread_worker(cxn, queue, args, kwargs):
188 1
    result = cxn.run(*args, **kwargs)
189
    # TODO: namedtuple or attrs object?
190 1
    queue.put((cxn, result))
191

192

193 1
class ThreadingGroup(Group):
194
    """
195
    Subclass of `.Group` which uses threading to execute concurrently.
196

197
    .. versionadded:: 2.0
198
    """
199

200 1
    def run(self, *args, **kwargs):
201 1
        results = GroupResult()
202 1
        queue = Queue()
203 1
        threads = []
204 1
        for cxn in self:
205 1
            my_kwargs = dict(cxn=cxn, queue=queue, args=args, kwargs=kwargs)
206 1
            thread = ExceptionHandlingThread(
207
                target=thread_worker, kwargs=my_kwargs
208
            )
209 1
            threads.append(thread)
210 1
        for thread in threads:
211 1
            thread.start()
212 1
        for thread in threads:
213
            # TODO: configurable join timeout
214
            # TODO: (in sudo's version) configurability around interactive
215
            # prompting resulting in an exception instead, as in v1
216 1
            thread.join()
217
        # Get non-exception results from queue
218 1
        while not queue.empty():
219
            # TODO: io-sleep? shouldn't matter if all threads are now joined
220 1
            cxn, result = queue.get(block=False)
221
            # TODO: outstanding musings about how exactly aggregate results
222
            # ought to ideally operate...heterogenous obj like this, multiple
223
            # objs, ??
224 1
            results[cxn] = result
225
        # Get exceptions from the threads themselves.
226
        # TODO: in a non-thread setup, this would differ, e.g.:
227
        # - a queue if using multiprocessing
228
        # - some other state-passing mechanism if using e.g. coroutines
229
        # - ???
230 1
        excepted = False
231 1
        for thread in threads:
232 1
            wrapper = thread.exception()
233 1
            if wrapper is not None:
234
                # Outer kwargs is Thread instantiation kwargs, inner is kwargs
235
                # passed to thread target/body.
236 1
                cxn = wrapper.kwargs["kwargs"]["cxn"]
237 1
                results[cxn] = wrapper.value
238 1
                excepted = True
239 1
        if excepted:
240 1
            raise GroupException(results)
241 1
        return results
242

243

244 1
class GroupResult(dict):
245
    """
246
    Collection of results and/or exceptions arising from `.Group` methods.
247

248
    Acts like a dict, but adds a couple convenience methods, to wit:
249

250
    - Keys are the individual `.Connection` objects from within the `.Group`.
251
    - Values are either return values / results from the called method (e.g.
252
      `.runners.Result` objects), *or* an exception object, if one prevented
253
      the method from returning.
254
    - Subclasses `dict`, so has all dict methods.
255
    - Has `.succeeded` and `.failed` attributes containing sub-dicts limited to
256
      just those key/value pairs that succeeded or encountered exceptions,
257
      respectively.
258

259
      - Of note, these attributes allow high level logic, e.g. ``if
260
        mygroup.run('command').failed`` and so forth.
261

262
    .. versionadded:: 2.0
263
    """
264

265 1
    def __init__(self, *args, **kwargs):
266 1
        super(dict, self).__init__(*args, **kwargs)
267 1
        self._successes = {}
268 1
        self._failures = {}
269

270 1
    def _bifurcate(self):
271
        # Short-circuit to avoid reprocessing every access.
272 1
        if self._successes or self._failures:
273 1
            return
274
        # TODO: if we ever expect .succeeded/.failed to be useful before a
275
        # GroupResult is fully initialized, this needs to become smarter.
276 1
        for key, value in self.items():
277 1
            if isinstance(value, BaseException):
278 1
                self._failures[key] = value
279
            else:
280 1
                self._successes[key] = value
281

282 1
    @property
283
    def succeeded(self):
284
        """
285
        A sub-dict containing only successful results.
286

287
        .. versionadded:: 2.0
288
        """
289 1
        self._bifurcate()
290 1
        return self._successes
291

292 1
    @property
293
    def failed(self):
294
        """
295
        A sub-dict containing only failed results.
296

297
        .. versionadded:: 2.0
298
        """
299 1
        self._bifurcate()
300 1
        return self._failures

Read our documentation on viewing source code .

Loading