1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using Libplanet.Crypto;
7
using Libplanet.Net;
8
using Libplanet.Net.Protocols;
9
using Serilog;
10
using Xunit;
11
using Xunit.Abstractions;
12

13
namespace Libplanet.Tests.Net.Protocols
14
{
15
    public class ProtocolTest
16
    {
17
        private const int Timeout = 60 * 1000;
18
        private readonly Dictionary<Address, TestTransport> _transports;
19

20 1
        public ProtocolTest(ITestOutputHelper output)
21 1
        {
22
            const string outputTemplate =
23
                "{Timestamp:HH:mm:ss}[@{Address}][{ThreadId}] - {Message}";
24 1
            Log.Logger = new LoggerConfiguration()
25 1
                .MinimumLevel.Verbose()
26 1
                .Enrich.WithThreadId()
27 1
                .WriteTo.TestOutput(output, outputTemplate: outputTemplate)
28 1
                .CreateLogger()
29 1
                .ForContext<ProtocolTest>();
30

31 1
            _transports = new Dictionary<Address, TestTransport>();
32 1
        }
33

34
        [Fact]
35
        public void KademliaTest()
36 1
        {
37 1
            var addr1 = new Address("0000000000000000000000000000000000000000");
38 1
            var addr2 = new Address("0000000000000000000000000000000000000001");
39 1
            var addr3 = new Address("000000000000000000000000000000000000000c");
40 1
            var addr4 = new Address("0000000001000001111110001000011001000001");
41

42 1
            Assert.Equal(
43 1
                new Address("000000000100000111111000100001100100000d"),
44 1
                Kademlia.CalculateDistance(addr3, addr4));
45

46 1
            Assert.Equal(159, Kademlia.CommonPrefixLength(addr1, addr2));
47 1
            Assert.Equal(156, Kademlia.CommonPrefixLength(addr1, addr3));
48 1
            Assert.Equal(39, Kademlia.CommonPrefixLength(addr1, addr4));
49

50 1
            Assert.True(string.CompareOrdinal(addr1.ToHex(), addr2.ToHex()) < 1);
51 1
            Assert.True(string.CompareOrdinal(addr2.ToHex(), addr3.ToHex()) < 1);
52 1
            Assert.True(string.CompareOrdinal(addr3.ToHex(), addr4.ToHex()) < 1);
53 1
        }
54

55
        [Fact(Timeout = Timeout)]
56
        public async Task Start()
57 1
        {
58 1
            var transportA = CreateTestTransport();
59 1
            var transportB = CreateTestTransport();
60

61 1
            Assert.Throws<SwarmException>(() => transportA.SendPing(transportB.AsPeer));
62 1
            await StartTestTransportAsync(transportA);
63 1
            await Assert.ThrowsAsync<TimeoutException>(() =>
64 1
                transportA.AddPeersAsync(
65 1
                    new[] { transportB.AsPeer },
66 1
                    TimeSpan.FromMilliseconds(500))
67 1
            );
68 1
            Assert.Empty(transportA.ReceivedMessages);
69 1
        }
70

71
        [Fact(Timeout = Timeout)]
72
        public async Task Ping()
73 1
        {
74 1
            var transportA = CreateTestTransport();
75 1
            var transportB = CreateTestTransport();
76

77
            try
78 1
            {
79 1
                await StartTestTransportAsync(transportA);
80 1
                await StartTestTransportAsync(transportB);
81 1
                transportA.SendPing(transportB.AsPeer);
82 1
                await transportA.MessageReceived.WaitAsync();
83 1
                await Task.Delay(100);
84

85 1
                Assert.Single(transportA.ReceivedMessages);
86 1
                Assert.Single(transportB.ReceivedMessages);
87 1
                Assert.Contains(transportA.AsPeer, transportB.Protocol.Peers);
88 1
            }
89
            finally
90 1
            {
91 1
                await transportA.StopAsync(TimeSpan.Zero);
92 1
                await transportB.StopAsync(TimeSpan.Zero);
93 1
            }
94 1
        }
95

96
        [Fact(Timeout = Timeout)]
97
        public async Task PingTwice()
98 1
        {
99 1
            var transportA = CreateTestTransport();
100 1
            var transportB = CreateTestTransport();
101

102
            try
103 1
            {
104 1
                await StartTestTransportAsync(transportA);
105 1
                await StartTestTransportAsync(transportB);
106

107 1
                transportA.SendPing(transportB.AsPeer);
108 1
                await transportA.MessageReceived.WaitAsync();
109 1
                await transportB.MessageReceived.WaitAsync();
110 1
                transportB.SendPing(transportA.AsPeer);
111 1
                await transportA.MessageReceived.WaitAsync();
112 1
                await transportB.MessageReceived.WaitAsync();
113

114 1
                Assert.Equal(2, transportA.ReceivedMessages.Count);
115 1
                Assert.Equal(2, transportB.ReceivedMessages.Count);
116 1
                Assert.Contains(transportA.AsPeer, transportB.Protocol.Peers);
117 1
                Assert.Contains(transportB.AsPeer, transportA.Protocol.Peers);
118 1
            }
119
            finally
120 1
            {
121 1
                await transportA.StopAsync(TimeSpan.Zero);
122 1
                await transportB.StopAsync(TimeSpan.Zero);
123 1
            }
124 1
        }
125

126
        [Fact(Timeout = Timeout)]
127
        public async Task PingToClosedPeer()
128 1
        {
129 1
            var transportA = CreateTestTransport();
130 1
            var transportB = CreateTestTransport();
131 1
            var transportC = CreateTestTransport();
132

133 1
            await StartTestTransportAsync(transportA);
134 1
            await StartTestTransportAsync(transportB);
135 1
            await StartTestTransportAsync(transportC);
136

137 1
            await transportA.AddPeersAsync(new[] { transportB.AsPeer, transportC.AsPeer }, null);
138

139 1
            Assert.Contains(transportB.AsPeer, transportA.Protocol.Peers);
140 1
            Assert.Contains(transportC.AsPeer, transportA.Protocol.Peers);
141

142 1
            await transportC.StopAsync(TimeSpan.Zero);
143 1
            await Assert.ThrowsAsync<TimeoutException>(
144 1
                () => transportA.AddPeersAsync(
145 1
                    new[] { transportC.AsPeer },
146 1
                    TimeSpan.FromSeconds(3)));
147 1
            await transportA.AddPeersAsync(new[] { transportB.AsPeer }, null);
148

149 1
            Assert.Contains(transportB.AsPeer, transportA.Protocol.Peers);
150

151 1
            await transportA.StopAsync(TimeSpan.Zero);
152 1
            await transportB.StopAsync(TimeSpan.Zero);
153 1
            await transportC.StopAsync(TimeSpan.Zero);
154 1
        }
155

156
        [Fact(Timeout = Timeout)]
157
        public async Task BootstrapException()
158 1
        {
159 1
            var transportA = CreateTestTransport();
160 1
            var transportB = CreateTestTransport();
161

162 1
            await Assert.ThrowsAsync<SwarmException>(
163 1
                () => transportB.BootstrapAsync(
164 1
                    new[] { transportA.AsPeer },
165 1
                    TimeSpan.FromSeconds(3),
166 1
                    TimeSpan.FromSeconds(3))
167 1
            );
168 1
        }
169

170
        [Fact(Timeout = Timeout)]
171
        public async Task BootstrapAsyncTest()
172 1
        {
173 1
            var transportA = CreateTestTransport();
174 1
            var transportB = CreateTestTransport();
175 1
            var transportC = CreateTestTransport();
176

177
            try
178 1
            {
179 1
                await StartTestTransportAsync(transportA);
180 1
                await StartTestTransportAsync(transportB);
181 1
                await StartTestTransportAsync(transportC);
182

183 1
                await transportB.BootstrapAsync(new[] { transportA.AsPeer });
184 1
                await transportC.BootstrapAsync(new[] { transportA.AsPeer });
185

186 1
                Assert.Contains(transportB.AsPeer, transportC.Protocol.Peers);
187 1
                Assert.Contains(transportC.AsPeer, transportB.Protocol.Peers);
188

189 1
                ((KademliaProtocol)transportA.Protocol).ClearTable();
190 1
                ((KademliaProtocol)transportB.Protocol).ClearTable();
191 1
                ((KademliaProtocol)transportC.Protocol).ClearTable();
192

193 1
                await transportB.AddPeersAsync(new[] { transportC.AsPeer }, null);
194 1
                await transportC.StopAsync(TimeSpan.Zero);
195 1
                await transportA.BootstrapAsync(new[] { transportB.AsPeer });
196 1
                Assert.Contains(transportB.AsPeer, transportA.Peers);
197 1
                Assert.DoesNotContain(transportC.AsPeer, transportA.Peers);
198 1
            }
199
            finally
200 1
            {
201 1
                await transportA.StopAsync(TimeSpan.Zero);
202 1
                await transportB.StopAsync(TimeSpan.Zero);
203 1
                await transportC.StopAsync(TimeSpan.Zero);
204 1
            }
205 1
        }
206

207
        [Fact(Timeout = Timeout)]
208
        public async Task RemoveStalePeers()
209 1
        {
210 1
            var transportA = CreateTestTransport();
211 1
            var transportB = CreateTestTransport();
212

213 1
            await StartTestTransportAsync(transportA);
214 1
            await StartTestTransportAsync(transportB);
215

216 1
            await transportA.AddPeersAsync(new[] { transportB.AsPeer }, null);
217 1
            Assert.Single(transportA.Protocol.Peers);
218

219 1
            await transportB.StopAsync(TimeSpan.Zero);
220 1
            await Task.Delay(100);
221 1
            await transportA.Protocol.RefreshTableAsync(TimeSpan.Zero, default(CancellationToken));
222 1
            Assert.Empty(transportA.Protocol.Peers);
223 1
            await transportA.StopAsync(TimeSpan.Zero);
224 1
        }
225

226
        [Fact(Timeout = Timeout)]
227
        public async Task RoutingTableFull()
228 1
        {
229 1
            var transport = CreateTestTransport(tableSize: 1, bucketSize: 1);
230 1
            var transportA = CreateTestTransport();
231 1
            var transportB = CreateTestTransport();
232 1
            var transportC = CreateTestTransport();
233

234 1
            await StartTestTransportAsync(transport);
235 1
            await StartTestTransportAsync(transportA);
236 1
            await StartTestTransportAsync(transportB);
237 1
            await StartTestTransportAsync(transportC);
238

239 1
            await transportA.AddPeersAsync(new[] { transport.AsPeer }, null);
240 1
            await transportB.AddPeersAsync(new[] { transport.AsPeer }, null);
241 1
            await transportC.AddPeersAsync(new[] { transport.AsPeer }, null);
242

243 1
            Assert.Single(transportA.Protocol.Peers);
244 1
            Assert.Contains(transportA.AsPeer, transport.Protocol.Peers);
245 1
            Assert.DoesNotContain(transportB.AsPeer, transport.Protocol.Peers);
246 1
            Assert.DoesNotContain(transportC.AsPeer, transport.Protocol.Peers);
247

248 1
            await transport.StopAsync(TimeSpan.Zero);
249 1
            await transportA.StopAsync(TimeSpan.Zero);
250 1
            await transportB.StopAsync(TimeSpan.Zero);
251 1
            await transportC.StopAsync(TimeSpan.Zero);
252 1
        }
253

254
        [Fact(Timeout = Timeout)]
255
        public async Task ReplacementCache()
256 1
        {
257 1
            var transport = CreateTestTransport(tableSize: 1, bucketSize: 1);
258 1
            var transportA = CreateTestTransport();
259 1
            var transportB = CreateTestTransport();
260 1
            var transportC = CreateTestTransport();
261

262 1
            await StartTestTransportAsync(transport);
263 1
            await StartTestTransportAsync(transportA);
264 1
            await StartTestTransportAsync(transportB);
265 1
            await StartTestTransportAsync(transportC);
266

267 1
            await transportA.AddPeersAsync(new[] { transport.AsPeer }, null);
268 1
            await transportB.AddPeersAsync(new[] { transport.AsPeer }, null);
269 1
            await Task.Delay(100);
270 1
            await transportC.AddPeersAsync(new[] { transport.AsPeer }, null);
271

272 1
            Assert.Single(transportA.Protocol.Peers);
273 1
            Assert.Contains(transportA.AsPeer, transport.Protocol.Peers);
274 1
            Assert.DoesNotContain(transportB.AsPeer, transport.Protocol.Peers);
275

276 1
            await transportA.StopAsync(TimeSpan.Zero);
277 1
            await transport.Protocol.RefreshTableAsync(TimeSpan.Zero, default(CancellationToken));
278 1
            await transport.Protocol.CheckReplacementCacheAsync(default(CancellationToken));
279

280 1
            Assert.Single(transport.Protocol.Peers);
281 1
            Assert.DoesNotContain(transportA.AsPeer, transport.Protocol.Peers);
282 1
            Assert.Contains(transportB.AsPeer, transport.Protocol.Peers);
283 1
            Assert.DoesNotContain(transportC.AsPeer, transport.Protocol.Peers);
284

285 1
            await transport.StopAsync(TimeSpan.Zero);
286 1
            await transportB.StopAsync(TimeSpan.Zero);
287 1
            await transportC.StopAsync(TimeSpan.Zero);
288 1
        }
289

290
        [Fact(Timeout = Timeout)]
291
        public async Task RemoveDeadReplacementCache()
292 1
        {
293 1
            var transport = CreateTestTransport(tableSize: 1, bucketSize: 1);
294 1
            var transportA = CreateTestTransport();
295 1
            var transportB = CreateTestTransport();
296 1
            var transportC = CreateTestTransport();
297

298 1
            await StartTestTransportAsync(transport);
299 1
            await StartTestTransportAsync(transportA);
300 1
            await StartTestTransportAsync(transportB);
301 1
            await StartTestTransportAsync(transportC);
302

303 1
            await transportA.AddPeersAsync(new[] { transport.AsPeer }, null);
304 1
            await transportB.AddPeersAsync(new[] { transport.AsPeer }, null);
305

306 1
            Assert.Single(transport.Protocol.Peers);
307 1
            Assert.Contains(transportA.AsPeer, transport.Protocol.Peers);
308 1
            Assert.DoesNotContain(transportB.AsPeer, transport.Protocol.Peers);
309

310 1
            await transportA.StopAsync(TimeSpan.Zero);
311 1
            await transportB.StopAsync(TimeSpan.Zero);
312

313 1
            await transportC.AddPeersAsync(new[] { transport.AsPeer }, null);
314 1
            await transport.Protocol.RefreshTableAsync(TimeSpan.Zero, default(CancellationToken));
315 1
            await transport.Protocol.CheckReplacementCacheAsync(default(CancellationToken));
316

317 1
            Assert.Single(transport.Protocol.Peers);
318 1
            Assert.DoesNotContain(transportA.AsPeer, transport.Protocol.Peers);
319 1
            Assert.DoesNotContain(transportB.AsPeer, transport.Protocol.Peers);
320 1
            Assert.Contains(transportC.AsPeer, transport.Protocol.Peers);
321

322 1
            await transport.StopAsync(TimeSpan.Zero);
323 1
            await transportC.StopAsync(TimeSpan.Zero);
324 1
        }
325

326
        [Theory(Timeout = 2 * Timeout)]
327
        [InlineData(1)]
328
        [InlineData(5)]
329
        [InlineData(20)]
330
        [InlineData(50)]
331
        public async Task BroadcastMessage(int count)
332 1
        {
333 1
            var seed = CreateTestTransport();
334 1
            await StartTestTransportAsync(seed);
335 1
            var transports = new TestTransport[count];
336 1
            for (var i = 0; i < count; i++)
337 1
            {
338 1
                transports[i] = CreateTestTransport();
339 1
                await StartTestTransportAsync(transports[i]);
340 1
            }
341

342
            try
343 1
            {
344 1
                foreach (var transport in transports)
345 1
                {
346 1
                    await transport.BootstrapAsync(new[] { seed.AsPeer });
347 1
                }
348

349 1
                Log.Debug("Bootstrap completed.");
350

351 1
                var tasks =
352 1
                    transports.Select(transport => transport.WaitForTestMessageWithData("foo"));
353

354 1
                seed.BroadcastTestMessage(null, "foo");
355 1
                Log.Debug("Broadcast completed.");
356

357 1
                await Task.WhenAll(tasks);
358 1
            }
359
            finally
360 1
            {
361 1
                foreach (var transport in transports)
362 1
                {
363 1
                    Assert.True(transport.ReceivedTestMessageOfData("foo"));
364 1
                    await transport.StopAsync(TimeSpan.Zero);
365 1
                }
366 1
            }
367 1
        }
368

369
        [Fact(Timeout = Timeout)]
370
        public async Task BroadcastGuarantee()
371 1
        {
372
            // Make sure t1 and t2 is in same bucket of seed's routing table.
373 1
            var privateKey0 = new PrivateKey(new byte[]
374 1
            {
375 1
                0x1a, 0x55, 0x30, 0x84, 0xe8, 0x9e, 0xee, 0x1e, 0x9f, 0xe2, 0xd1, 0x49, 0xe7, 0xa9,
376 1
                0x53, 0xa9, 0xb4, 0xe4, 0xfe, 0x5a, 0xc1, 0x6c, 0x61, 0x9f, 0x54, 0x8f, 0x5e, 0xd9,
377 1
                0x7f, 0xa3, 0xa0, 0x79,
378 1
            });
379 1
            var privateKey1 = new PrivateKey(new byte[]
380 1
            {
381 1
                0x8e, 0x26, 0x31, 0x4a, 0xee, 0x84, 0xd, 0x8a, 0xea, 0x7b, 0x6, 0xf8, 0x81, 0x5f,
382 1
                0x69, 0xb3, 0x44, 0x46, 0xe0, 0x27, 0x65, 0x17, 0x1, 0x16, 0x58, 0x26, 0x69, 0x93,
383 1
                0x48, 0xbb, 0xf, 0xb4,
384 1
            });
385 1
            var privateKey2 = new PrivateKey(new byte[]
386 1
            {
387 1
                0xd4, 0x6b, 0x4b, 0x38, 0xde, 0x39, 0x25, 0x3b, 0xd8, 0x1, 0x9d, 0x2, 0x2, 0x7a,
388 1
                0x90, 0x9, 0x46, 0x2f, 0xc1, 0xd3, 0xd9, 0xa, 0xa6, 0xf4, 0xfa, 0x9a, 0x6, 0xa3,
389 1
                0x60, 0xed, 0xf3, 0xd7,
390 1
            });
391

392 1
            var seed = CreateTestTransport(privateKey0);
393 1
            var t1 = CreateTestTransport(privateKey1, true);
394 1
            var t2 = CreateTestTransport(privateKey2);
395 1
            await StartTestTransportAsync(seed);
396 1
            await StartTestTransportAsync(t1);
397 1
            await StartTestTransportAsync(t2);
398

399
            try
400 1
            {
401 1
                await t1.BootstrapAsync(new[] { seed.AsPeer });
402 1
                await t2.BootstrapAsync(new[] { seed.AsPeer });
403

404 1
                Log.Debug(seed.Protocol.Trace());
405

406 1
                Log.Debug("Bootstrap completed.");
407

408 1
                var tcs = new CancellationTokenSource();
409 1
                var task = t2.WaitForTestMessageWithData("foo", tcs.Token);
410

411 1
                seed.BroadcastTestMessage(null, "foo");
412 1
                Log.Debug("Broadcast \"foo\" completed.");
413

414 1
                tcs.CancelAfter(TimeSpan.FromSeconds(5));
415 1
                await task;
416

417 1
                Assert.True(t2.ReceivedTestMessageOfData("foo"));
418

419 1
                tcs = new CancellationTokenSource();
420 1
                task = t2.WaitForTestMessageWithData("bar", tcs.Token);
421

422 1
                seed.BroadcastTestMessage(null, "bar");
423 1
                Log.Debug("Broadcast \"bar\" completed.");
424

425 1
                tcs.CancelAfter(TimeSpan.FromSeconds(5));
426 1
                await task;
427

428 1
                Assert.True(t2.ReceivedTestMessageOfData("bar"));
429

430 1
                tcs = new CancellationTokenSource();
431 1
                task = t2.WaitForTestMessageWithData("baz", tcs.Token);
432

433 1
                seed.BroadcastTestMessage(null, "baz");
434 1
                Log.Debug("Broadcast \"baz\" completed.");
435

436 1
                tcs.CancelAfter(TimeSpan.FromSeconds(5));
437 1
                await task;
438

439 1
                Assert.True(t2.ReceivedTestMessageOfData("baz"));
440

441 1
                tcs = new CancellationTokenSource();
442 1
                task = t2.WaitForTestMessageWithData("qux", tcs.Token);
443

444 1
                seed.BroadcastTestMessage(null, "qux");
445 1
                Log.Debug("Broadcast \"qux\" completed.");
446

447 1
                tcs.CancelAfter(TimeSpan.FromSeconds(5));
448 1
                await task;
449

450 1
                Assert.True(t2.ReceivedTestMessageOfData("qux"));
451 1
            }
452
            finally
453 1
            {
454 1
                await seed.StopAsync(TimeSpan.Zero);
455 1
                await t1.StopAsync(TimeSpan.Zero);
456 1
                await t2.StopAsync(TimeSpan.Zero);
457 1
            }
458 1
        }
459

460
        [Fact(Timeout = Timeout)]
461
        public async Task DoNotBroadcastToSourcePeer()
462 1
        {
463 1
            TestTransport transportA = CreateTestTransport(new PrivateKey());
464 1
            TestTransport transportB = CreateTestTransport(new PrivateKey());
465 1
            TestTransport transportC = CreateTestTransport(new PrivateKey());
466

467 1
            await StartTestTransportAsync(transportA);
468 1
            await StartTestTransportAsync(transportB);
469 1
            await StartTestTransportAsync(transportC);
470

471
            try
472 1
            {
473 1
                await transportA.AddPeersAsync(new[] { transportB.AsPeer }, null);
474 1
                await transportB.AddPeersAsync(new[] { transportC.AsPeer }, null);
475

476 1
                transportA.BroadcastTestMessage(null, "foo");
477 1
                await transportC.WaitForTestMessageWithData("foo");
478 1
                await Task.Delay(100);
479

480 1
                Assert.True(transportC.ReceivedTestMessageOfData("foo"));
481 1
                Assert.False(transportA.ReceivedTestMessageOfData("foo"));
482 1
            }
483
            finally
484 1
            {
485 1
                await transportA.StopAsync(TimeSpan.Zero);
486 1
                await transportB.StopAsync(TimeSpan.Zero);
487 1
                await transportC.StopAsync(TimeSpan.Zero);
488 1
            }
489 1
        }
490

491
        private TestTransport CreateTestTransport(
492
            PrivateKey privateKey = null,
493
            bool blockBroadcast = false,
494
            int? tableSize = null,
495
            int? bucketSize = null,
496
            TimeSpan? networkDelay = null)
497 1
        {
498 1
            return new TestTransport(
499 1
                _transports,
500 1
                privateKey ?? new PrivateKey(),
501 1
                blockBroadcast,
502 1
                tableSize,
503 1
                bucketSize,
504 1
                networkDelay);
505 1
        }
506

507
        private async Task<Task> StartTestTransportAsync(
508
            TestTransport transport,
509
            CancellationToken cancellationToken = default(CancellationToken))
510 1
        {
511 1
            await transport.StartAsync(cancellationToken);
512 1
            return transport.RunAsync(cancellationToken);
513 1
        }
514
    }
515
}

Read our documentation on viewing source code .

Loading