#69 Added Throttle support for async imports

Open Bilge

No flags found

Use flags to group coverage reports by test type, project and/or folders.
Then setup custom commit statuses and notifications for each flag.

e.g., #unittest #integration

#production #enterprise

#frontend #backend

Learn more about Codecov Flags here.


@@ -3,6 +3,8 @@
Loading
3 3
4 4
namespace ScriptFUSION\Porter\Specification;
5 5
6 +
use ScriptFUSION\Async\Throttle\NullThrottle;
7 +
use ScriptFUSION\Async\Throttle\Throttle;
6 8
use ScriptFUSION\Porter\Connector\Recoverable\ExponentialAsyncDelayRecoverableExceptionHandler;
7 9
use ScriptFUSION\Porter\Connector\Recoverable\RecoverableExceptionHandler;
8 10
use ScriptFUSION\Porter\Provider\Resource\AsyncResource;
@@ -15,6 +17,9 @@
Loading
15 17
{
16 18
    private $asyncResource;
17 19
20 +
    /** @var Throttle */
21 +
    private $throttle;
22 +
18 23
    /**
19 24
     * Initializes this instance with the specified asynchronous resource.
20 25
     *
@@ -30,6 +35,7 @@
Loading
30 35
    public function __clone()
31 36
    {
32 37
        $this->asyncResource = clone $this->asyncResource;
38 +
        // Throttle is not cloned because it most likely wants to be shared between imports.
33 39
34 40
        parent::__clone();
35 41
    }
@@ -53,4 +59,28 @@
Loading
53 59
    {
54 60
        return new ExponentialAsyncDelayRecoverableExceptionHandler;
55 61
    }
62 +
63 +
    /**
64 +
     * Gets the asynchronous connection throttle, invoked each time a connector fetches data.
65 +
     *
66 +
     * @return Throttle Asynchronous connection throttle.
67 +
     */
68 +
    final public function getThrottle(): Throttle
69 +
    {
70 +
        return $this->throttle ?? $this->throttle = new NullThrottle;
71 +
    }
72 +
73 +
    /**
74 +
     * Sets the asynchronous connection throttle, invoked each time a connector fetches data.
75 +
     *
76 +
     * @param Throttle $throttle Asynchronous connection throttle.
77 +
     *
78 +
     * @return $this
79 +
     */
80 +
    final public function setThrottle(Throttle $throttle): self
81 +
    {
82 +
        $this->throttle = $throttle;
83 +
84 +
        return $this;
85 +
    }
56 86
}

@@ -4,10 +4,12 @@
Loading
4 4
namespace ScriptFUSION\Porter\Connector;
5 5
6 6
use Amp\Promise;
7 +
use ScriptFUSION\Async\Throttle\Throttle;
7 8
use ScriptFUSION\Porter\Cache\CacheUnavailableException;
8 9
use ScriptFUSION\Porter\Connector\Recoverable\RecoverableException;
9 10
use ScriptFUSION\Porter\Connector\Recoverable\RecoverableExceptionHandler;
10 11
use ScriptFUSION\Porter\Connector\Recoverable\StatelessRecoverableExceptionHandler;
12 +
use function Amp\call;
11 13
use function Amp\Promise\all;
12 14
use function ScriptFUSION\Retry\retry;
13 15
use function ScriptFUSION\Retry\retryAsync;
@@ -40,6 +42,8 @@
Loading
40 42
41 43
    private $maxFetchAttempts;
42 44
45 +
    private $throttle;
46 +
43 47
    /**
44 48
     * @param Connector|AsyncConnector $connector Wrapped connector.
45 49
     * @param RecoverableExceptionHandler $recoverableExceptionHandler User's recoverable exception handler.
@@ -50,7 +54,8 @@
Loading
50 54
        $connector,
51 55
        RecoverableExceptionHandler $recoverableExceptionHandler,
52 56
        int $maxFetchAttempts,
53 -
        bool $mustCache
57 +
        bool $mustCache,
58 +
        ?Throttle $throttle
54 59
    ) {
55 60
        if ($mustCache && !$connector instanceof CachingConnector) {
56 61
            throw CacheUnavailableException::createUnsupported();
@@ -64,6 +69,7 @@
Loading
64 69
        );
65 70
        $this->userExceptionHandler = $recoverableExceptionHandler;
66 71
        $this->maxFetchAttempts = $maxFetchAttempts;
72 +
        $this->throttle = $throttle;
67 73
    }
68 74
69 75
    /**
@@ -96,7 +102,15 @@
Loading
96 102
        return retryAsync(
97 103
            $this->maxFetchAttempts,
98 104
            function () use ($source): Promise {
99 -
                return $this->connector->fetchAsync($source);
105 +
                return call(function () use ($source): \Generator {
106 +
                    while (!yield $this->throttle->join()) {
107 +
                        // Throttle is choked. Wait for free slot.
108 +
                    }
109 +
110 +
                    yield $this->throttle->await($response = $this->connector->fetchAsync($source));
111 +
112 +
                    return yield $response;
113 +
                });
100 114
            },
101 115
            $this->createExceptionHandler()
102 116
        );

@@ -3,6 +3,7 @@
Loading
3 3
4 4
namespace ScriptFUSION\Porter\Connector;
5 5
6 +
use ScriptFUSION\Porter\Specification\AsyncImportSpecification;
6 7
use ScriptFUSION\Porter\Specification\Specification;
7 8
use ScriptFUSION\StaticClass;
8 9
@@ -22,7 +23,8 @@
Loading
22 23
            $connector,
23 24
            $specification->getRecoverableExceptionHandler(),
24 25
            $specification->getMaxFetchAttempts(),
25 -
            $specification->mustCache()
26 +
            $specification->mustCache(),
27 +
            $specification instanceof AsyncImportSpecification ? $specification->getThrottle() : null
26 28
        );
27 29
    }
28 30
}

Everything is accounted for!

No changes detected that need to be reviewed.
What changes does Codecov check for?
Lines, not adjusted in diff, that have changed coverage data.
Files that introduced coverage data that had none before.
Files that have missing coverage data that once were tracked.
Files Complexity Coverage
src ø 100.00%
Project Totals (37 files) 230 100.00%
Loading