/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include #include #include #include #if FOLLY_HAS_COROUTINES namespace folly { namespace coro { // An AsyncGenerator with a write end // // Usage: // auto pipe = AsyncPipe::create(); // pipe.second.write(std::move(val1)); // auto val2 = co_await pipe.first.next(); // // write() returns false if the read end has been destroyed (unless // SingleProducer is disabled, in which case this behavior is undefined). // The generator is completed when the write end is destroyed or on close() // close() can also be passed an exception, which is thrown when read. // // An optional onClosed callback can be passed to create(). This callback will // be called either when the generator is destroyed by the consumer, or when // the pipe is closed by the publisher (whichever comes first). The onClosed // callback may destroy the AsyncPipe object inline, and must not call close() // on the AsyncPipe object inline. If an onClosed callback is specified and the // publisher would like to destroy the pipe outside of the callback, it must // first close the pipe. // // If SingleProducer is disabled, AsyncPipe's write() method (but not its // close() method) becomes thread-safe. close() must be sequenced after all // write()s in this mode. template < typename T, bool SingleProducer = true, template typename QueueType = SmallUnboundedQueue> class AsyncPipe { public: ~AsyncPipe() { CHECK(!onClosed_ || onClosed_->wasInvokeRequested()) << "If an onClosed callback is specified and the generator still " << "exists, the publisher must explicitly close the pipe prior to " << "destruction."; std::move(*this).close(); } AsyncPipe(AsyncPipe&& pipe) noexcept { queue_ = std::move(pipe.queue_); onClosed_ = std::move(pipe.onClosed_); } AsyncPipe& operator=(AsyncPipe&& pipe) { if (this != &pipe) { CHECK(!onClosed_ || onClosed_->wasInvokeRequested()) << "If an onClosed callback is specified and the generator still " << "exists, the publisher must explicitly close the pipe prior to " << "destruction."; std::move(*this).close(); queue_ = std::move(pipe.queue_); onClosed_ = std::move(pipe.onClosed_); } return *this; } static std::pair, AsyncPipe> create( folly::Function onClosed = nullptr) { auto queue = std::make_shared(); auto cancellationSource = std::optional(); auto onClosedCallback = std::unique_ptr(); if (onClosed != nullptr) { cancellationSource.emplace(); onClosedCallback = std::make_unique( *cancellationSource, std::move(onClosed)); } auto guard = folly::makeGuard([cancellationSource = std::move(cancellationSource)] { if (cancellationSource) { cancellationSource->requestCancellation(); } }); return { folly::coro::co_invoke( [queue, guard = std::move(guard)]() -> folly::coro::AsyncGenerator { while (true) { co_yield co_result(co_await co_nothrow(queue->dequeue())); } }), AsyncPipe(queue, std::move(onClosedCallback))}; } template bool write(U&& val) { if (auto queue = queue_.lock()) { queue->enqueue(folly::Try(std::forward(val))); return true; } return false; } void close(folly::exception_wrapper ew) && { if (auto queue = queue_.lock()) { queue->enqueue(folly::Try(std::move(ew))); queue_.reset(); } if (onClosed_ != nullptr) { onClosed_->requestInvoke(); onClosed_.reset(); } } void close() && { if (auto queue = queue_.lock()) { queue->enqueue(folly::Try()); queue_.reset(); } if (onClosed_ != nullptr) { onClosed_->requestInvoke(); onClosed_.reset(); } } bool isClosed() const { return queue_.expired(); } private: using Queue = QueueType, SingleProducer, true>; class OnClosedCallback { public: OnClosedCallback( folly::CancellationSource cancellationSource, folly::Function onClosedFunc) : cancellationSource_(std::move(cancellationSource)), cancellationCallback_( cancellationSource_.getToken(), std::move(onClosedFunc)) {} void requestInvoke() { cancellationSource_.requestCancellation(); } bool wasInvokeRequested() { return cancellationSource_.isCancellationRequested(); } private: folly::CancellationSource cancellationSource_; folly::CancellationCallback cancellationCallback_; }; explicit AsyncPipe( std::weak_ptr queue, std::unique_ptr onClosed) : queue_(std::move(queue)), onClosed_(std::move(onClosed)) {} std::weak_ptr queue_; std::unique_ptr onClosed_; }; // Bounded variant of AsyncPipe which buffers a fixed number of writes // before blocking new attempts to write until the buffer is drained. // // Usage: // auto [generator, pipe] = BoundedAsyncPipe::create(/* tokens */ 10); // co_await pipe.write(std::move(entry)); // auto entry = co_await generator.next().value(); // // write() is a coroutine which only blocks when // no capacity is remaining. write() returns false if the read-end has been // destroyed or was destroyed while blocking, only throwing OperationCanceled // if the parent coroutine was canceled while blocking. // // try_write() is offered which will never block, but will return false // and not write if no capacity is remaining or the read end is already // destroyed. // // close() functions the same as AsyncPipe, and must be invoked before // destruction if an onClose callback is attached. template < typename T, bool SingleProducer = true, template typename QueueType = SmallUnboundedQueue> class BoundedAsyncPipe { public: using Pipe = AsyncPipe; static std::pair, BoundedAsyncPipe> create( size_t tokens, folly::Function onClosed = nullptr) { auto [generator, pipe] = Pipe::create(std::move(onClosed)); auto semaphore = std::make_shared(tokens); folly::CancellationSource cancellationSource; auto cancellationToken = cancellationSource.getToken(); auto guard = folly::makeGuard( [cancellationSource = std::move(cancellationSource)]() { cancellationSource.requestCancellation(); }); auto signalingGenerator = co_invoke( [generator_2 = std::move(generator), guard = std::move(guard), semaphore]() mutable -> folly::coro::AsyncGenerator { while (true) { auto itemTry = co_await co_awaitTry(generator_2.next()); semaphore->signal(); co_yield co_result(std::move(itemTry)); } }); return std::pair, BoundedAsyncPipe>( std::move(signalingGenerator), BoundedAsyncPipe( std::move(pipe), std::move(semaphore), std::move(cancellationToken))); } template folly::coro::Task write(U&& u) { auto parentToken = co_await co_current_cancellation_token; auto waitResult = co_await co_awaitTry(co_withCancellation( folly::CancellationToken::merge( std::move(parentToken), cancellationToken_), semaphore_->co_wait())); if (cancellationToken_.isCancellationRequested()) { // eagerly return false if the read-end was destroyed instead of throwing // OperationCanceled, to have uniform behavior when the generator is // destroyed co_return false; } else if (waitResult.hasException()) { co_yield co_error(std::move(waitResult).exception()); } co_return pipe_.write(std::forward(u)); } template bool try_write(U&& u) { bool available = semaphore_->try_wait(); if (!available) { return false; } return pipe_.write(std::forward(u)); } size_t getAvailableSpace() { return semaphore_->getAvailableTokens(); } size_t getOccupiedSpace() { return semaphore_->getCapacity() - getAvailableSpace(); } void close(exception_wrapper&& w) && { std::move(pipe_).close(std::move(w)); } void close() && { std::move(pipe_).close(); } bool isClosed() const { return pipe_.isClosed(); } private: BoundedAsyncPipe( Pipe&& pipe, std::shared_ptr semaphore, folly::CancellationToken cancellationToken) : pipe_(std::move(pipe)), semaphore_(std::move(semaphore)), cancellationToken_(std::move(cancellationToken)) {} Pipe pipe_; std::shared_ptr semaphore_; folly::CancellationToken cancellationToken_; }; } // namespace coro } // namespace folly #endif // FOLLY_HAS_COROUTINES