fabric / fabric
1
"""
2
Tunnel and connection forwarding internals.
3

4
If you're looking for simple, end-user-focused connection forwarding, please
5
see `.Connection`, e.g. `.Connection.forward_local`.
6
"""
7

8 1
import errno
9 1
import select
10 1
import socket
11 1
import time
12 1
from threading import Event
13

14 1
from invoke.exceptions import ThreadException
15 1
from invoke.util import ExceptionHandlingThread
16

17

18 1
class TunnelManager(ExceptionHandlingThread):
19
    """
20
    Thread subclass for tunnelling connections over SSH between two endpoints.
21

22
    Specifically, one instance of this class is sufficient to sit around
23
    forwarding any number of individual connections made to one end of the
24
    tunnel or the other. If you need to forward connections between more than
25
    one set of ports, you'll end up instantiating multiple TunnelManagers.
26

27
    Wraps a `~paramiko.transport.Transport`, which should already be connected
28
    to the remote server.
29

30
    .. versionadded:: 2.0
31
    """
32

33 1
    def __init__(
34
        self,
35
        local_host,
36
        local_port,
37
        remote_host,
38
        remote_port,
39
        transport,
40
        finished,
41
    ):
42 1
        super(TunnelManager, self).__init__()
43 1
        self.local_address = (local_host, local_port)
44 1
        self.remote_address = (remote_host, remote_port)
45 1
        self.transport = transport
46 1
        self.finished = finished
47

48 1
    def _run(self):
49
        # Track each tunnel that gets opened during our lifetime
50 1
        tunnels = []
51

52
        # Set up OS-level listener socket on forwarded port
53 1
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
54
        # TODO: why do we want REUSEADDR exactly? and is it portable?
55 1
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
56
        # NOTE: choosing to deal with nonblocking semantics and a fast loop,
57
        # versus an older approach which blocks & expects outer scope to cause
58
        # a socket exception by close()ing the socket.
59 1
        sock.setblocking(0)
60 1
        sock.bind(self.local_address)
61 1
        sock.listen(1)
62

63 1
        while not self.finished.is_set():
64
            # Main loop-wait: accept connections on the local listener
65
            # NOTE: EAGAIN means "you're nonblocking and nobody happened to
66
            # connect at this point in time"
67 1
            try:
68 1
                tun_sock, local_addr = sock.accept()
69
                # Set TCP_NODELAY to match OpenSSH's forwarding socket behavior
70 1
                tun_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
71 1
            except socket.error as e:
72 1
                if e.errno is errno.EAGAIN:
73
                    # TODO: make configurable
74 1
                    time.sleep(0.01)
75 1
                    continue
76 0
                raise
77

78
            # Set up direct-tcpip channel on server end
79
            # TODO: refactor w/ what's used for gateways
80 1
            channel = self.transport.open_channel(
81
                "direct-tcpip", self.remote_address, local_addr
82
            )
83

84
            # Set up 'worker' thread for this specific connection to our
85
            # tunnel, plus its dedicated signal event (which will appear as a
86
            # public attr, no need to track both independently).
87 1
            finished = Event()
88 1
            tunnel = Tunnel(channel=channel, sock=tun_sock, finished=finished)
89 1
            tunnel.start()
90 1
            tunnels.append(tunnel)
91

92 1
        exceptions = []
93
        # Propogate shutdown signal to all tunnels & wait for closure
94
        # TODO: would be nice to have some output or at least logging here,
95
        # especially for "sets up a handful of tunnels" use cases like
96
        # forwarding nontrivial HTTP traffic.
97 1
        for tunnel in tunnels:
98 1
            tunnel.finished.set()
99 1
            tunnel.join()
100 1
            wrapper = tunnel.exception()
101 1
            if wrapper:
102 1
                exceptions.append(wrapper)
103
        # Handle exceptions
104 1
        if exceptions:
105 1
            raise ThreadException(exceptions)
106

107
        # All we have left to close is our own sock.
108
        # TODO: use try/finally?
109 1
        sock.close()
110

111

112 1
class Tunnel(ExceptionHandlingThread):
113
    """
114
    Bidirectionally forward data between an SSH channel and local socket.
115

116
    .. versionadded:: 2.0
117
    """
118

119 1
    def __init__(self, channel, sock, finished):
120 1
        self.channel = channel
121 1
        self.sock = sock
122 1
        self.finished = finished
123 1
        self.socket_chunk_size = 1024
124 1
        self.channel_chunk_size = 1024
125 1
        super(Tunnel, self).__init__()
126

127 1
    def _run(self):
128 1
        try:
129 1
            empty_sock, empty_chan = None, None
130 1
            while not self.finished.is_set():
131 1
                r, w, x = select.select([self.sock, self.channel], [], [], 1)
132 1
                if self.sock in r:
133 1
                    empty_sock = self.read_and_write(
134
                        self.sock, self.channel, self.socket_chunk_size
135
                    )
136 1
                if self.channel in r:
137 1
                    empty_chan = self.read_and_write(
138
                        self.channel, self.sock, self.channel_chunk_size
139
                    )
140 1
                if empty_sock or empty_chan:
141 0
                    break
142
        finally:
143 1
            self.channel.close()
144 1
            self.sock.close()
145

146 1
    def read_and_write(self, reader, writer, chunk_size):
147
        """
148
        Read ``chunk_size`` from ``reader``, writing result to ``writer``.
149

150
        Returns ``None`` if successful, or ``True`` if the read was empty.
151

152
        .. versionadded:: 2.0
153
        """
154 1
        data = reader.recv(chunk_size)
155 1
        if len(data) == 0:
156 0
            return True
157 1
        writer.sendall(data)

Read our documentation on viewing source code .

Loading