/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once namespace folly { namespace detail { template TimePoint roundUpTimePoint(TimePoint t, Duration d) { auto remainder = t.time_since_epoch() % d; if (remainder.count() != 0) { return t + d - remainder; } return t; } template void removeEmpty(std::vector& v) { const auto& isEmpty = [](const T& val) { return val.empty(); }; v.erase(std::remove_if(v.begin(), v.end(), isEmpty), v.end()); } template BufferedStat::BufferedStat( typename ClockT::duration bufferDuration, size_t bufferSize, size_t digestSize) : bufferDuration_(bufferDuration), expiry_(roundUp(ClockT::now())), digestBuilder_(bufferSize, digestSize) {} template void BufferedStat::append(double value, TimePoint now) { if (FOLLY_UNLIKELY(now > expiry_.load(std::memory_order_relaxed))) { std::unique_lock g(mutex_, std::try_to_lock_t()); if (g.owns_lock()) { doUpdate(now, g, UpdateMode::OnExpiry); } } digestBuilder_.append(value); } template typename BufferedStat::TimePoint BufferedStat::roundUp(TimePoint t) { return roundUpTimePoint(t, bufferDuration_); } template std::unique_lock BufferedStat::updateIfExpired( TimePoint now) { std::unique_lock g(mutex_); doUpdate(now, g, UpdateMode::OnExpiry); return g; } template void BufferedStat::flush() { std::unique_lock g(mutex_); doUpdate(ClockT::now(), g, UpdateMode::Now); } template void BufferedStat::doUpdate( TimePoint now, const std::unique_lock& g, UpdateMode updateMode) { assert(g.owns_lock()); // Check that no other thread has performed the slide after the check auto oldExpiry = expiry_.load(std::memory_order_relaxed); if (now > oldExpiry || updateMode == UpdateMode::Now) { now = roundUp(now); expiry_.store(now, std::memory_order_relaxed); onNewDigest(digestBuilder_.build(), now, oldExpiry, g); } } template BufferedDigest::BufferedDigest( typename ClockT::duration bufferDuration, size_t bufferSize, size_t digestSize) : BufferedStat(bufferDuration, bufferSize, digestSize), digest_(digestSize) {} template DigestT BufferedDigest::get(TimePoint now) { auto g = this->updateIfExpired(now); return digest_; } template void BufferedDigest::onNewDigest( DigestT digest, TimePoint /*newExpiry*/, TimePoint /*oldExpiry*/, const std::unique_lock& /*g*/) { digest_ = DigestT::merge(digest_, digest); } template BufferedSlidingWindow::BufferedSlidingWindow( size_t nBuckets, typename ClockT::duration bufferDuration, size_t bufferSize, size_t digestSize) : BufferedStat(bufferDuration, bufferSize, digestSize), slidingWindow_([=]() { return DigestT(digestSize); }, nBuckets) {} template std::vector BufferedSlidingWindow::get( TimePoint now) { std::vector digests; { auto g = this->updateIfExpired(now); digests = slidingWindow_.get(); } removeEmpty(digests); return digests; } template void BufferedSlidingWindow::onNewDigest( DigestT digest, TimePoint newExpiry, TimePoint oldExpiry, const std::unique_lock& /*g*/) { if (newExpiry > oldExpiry) { auto diff = (newExpiry - oldExpiry) / this->bufferDuration_; slidingWindow_.slide(diff); slidingWindow_.set(diff - 1, std::move(digest)); } else { // just update current window slidingWindow_.set( 0 /* current window */, DigestT::merge(slidingWindow_.front(), digest)); } } template BufferedMultiSlidingWindow::Window::Window( TimePoint firstExpiry, std::chrono::seconds bucketDur, size_t nBuckets, size_t digestSize) : bucketDuration(bucketDur), expiry(roundUpTimePoint(firstExpiry, bucketDur)), curBucket(digestSize), slidingWindow([=] { return DigestT(digestSize); }, nBuckets) {} template BufferedMultiSlidingWindow::BufferedMultiSlidingWindow( Range defs, size_t bufferSize, size_t digestSize) : BufferedStat( std::chrono::seconds{1}, bufferSize, digestSize), digestSize_(digestSize), allTime_(digestSize) { for (const auto& def : defs) { windows_.emplace_back( this->expiry_.load(std::memory_order_relaxed), def.first, def.second, digestSize); } } template auto BufferedMultiSlidingWindow::get(TimePoint now) -> Digests { auto digests = [&] { std::vector> windowDigests; windowDigests.reserve(windows_.size()); auto g = this->updateIfExpired(now); for (const auto& window : windows_) { windowDigests.push_back(window.slidingWindow.get()); } return Digests{allTime_, std::move(windowDigests)}; }(); for (auto& w : digests.windows) { removeEmpty(w); } return digests; } template void BufferedMultiSlidingWindow::onNewDigest( DigestT digest, TimePoint newExpiry, TimePoint oldExpiry, const std::unique_lock& /* g */) { for (auto& window : windows_) { assert(oldExpiry <= window.expiry); auto& curBucket = window.curBucket; auto& slidingWindow = window.slidingWindow; if (newExpiry > oldExpiry) { curBucket = DigestT::merge(curBucket, digest); if (newExpiry > window.expiry) { auto newWindowExpiry = roundUpTimePoint(newExpiry, window.bucketDuration); auto diff = (newWindowExpiry - window.expiry) / window.bucketDuration; slidingWindow.slide(diff); slidingWindow.set( diff - 1, std::exchange(curBucket, DigestT{digestSize_})); window.expiry = newWindowExpiry; } } else { // This is a flush, merge curBucket in even if it hasn't expired. std::array a{ &slidingWindow.front(), &curBucket, &digest}; slidingWindow.set(0, DigestT::merge(range(a))); curBucket = DigestT{digestSize_}; } } allTime_ = DigestT::merge(allTime_, digest); } } // namespace detail } // namespace folly