1
using System;
2
using System.Threading;
3
using System.Threading.Tasks;
4
using NetMQ;
5

6
namespace Libplanet.Net
7
{
8
    internal static class NetMQSocketExtensions
9
    {
10
        public static Task SendMultipartMessageAsync(
11
            this NetMQSocket socket,
12
            NetMQMessage message,
13
            TimeSpan? timeout = null,
14
            CancellationToken cancellationToken = default(CancellationToken))
15 1
        {
16 1
            var cts = new CancellationTokenSource();
17 1
            if (timeout is TimeSpan timeoutNotNull)
18 1
            {
19 1
                cts.CancelAfter(timeoutNotNull);
20 1
            }
21

22 1
            var ct = CancellationTokenSource.CreateLinkedTokenSource(
23 1
                cancellationToken,
24 1
                cts.Token
25 1
            );
26

27 1
            return socket.SendMultipartMessageAsync(
28 1
                message,
29 1
                false,
30 1
                cancellationToken: ct.Token
31 1
            ).ContinueWith(t =>
32 1
            {
33 1
                if (t.IsCanceled && cts.Token.IsCancellationRequested)
34 0
                {
35 0
                    throw new TimeoutException(
36 0
                        $"The operation exceeded the specified time: {timeout}."
37 0
                    );
38 1
                }
39 1

40 1
                cts.Dispose();
41 1
                ct.Dispose();
42 1
                return t;
43 1
            });
44 1
        }
45

46
        public static Task<NetMQMessage> ReceiveMultipartMessageAsync(
47
            this NetMQSocket socket,
48
            TimeSpan? timeout = null,
49
            CancellationToken cancellationToken = default(CancellationToken))
50 1
        {
51 1
            var cts = new CancellationTokenSource();
52 1
            if (timeout is TimeSpan timeoutNotNull)
53 1
            {
54 1
                cts.CancelAfter(timeoutNotNull);
55 1
            }
56

57 1
            var ct = CancellationTokenSource.CreateLinkedTokenSource(
58 1
                cancellationToken,
59 1
                cts.Token
60 1
            );
61

62 1
            return socket.ReceiveMultipartMessageAsync(
63 1
                expectedFrameCount: 4,
64 1
                cancellationToken: ct.Token
65 1
            ).ContinueWith(t =>
66 1
            {
67 1
                if (t.IsCanceled && cts.IsCancellationRequested)
68 1
                {
69 1
                    throw new TimeoutException(
70 1
                        $"The operation exceeded the specified time: {timeout}."
71 1
                    );
72 1
                }
73 1

74 1
                cts.Dispose();
75 1
                ct.Dispose();
76 1
                return t.Result;
77 1
            });
78 1
        }
79
    }
80
}

Read our documentation on viewing source code .

Loading