/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include namespace folly { class AsyncSocketException; class EventBase; class AsyncTransport; class AsyncSocket; /** * Observer of socket events. */ class AsyncSocketObserverInterface { public: enum class Events {}; AsyncSocketObserverInterface() = default; virtual ~AsyncSocketObserverInterface() = default; /** * Information provided to observer during prewrite event. * * Based on this information, an observer can build a PrewriteRequest. */ struct PrewriteState { // raw byte stream offsets size_t startOffset{0}; size_t endOffset{0}; // flags already set WriteFlags writeFlags{WriteFlags::NONE}; // timestamp recorded at the AsyncSocket layer // // supports sequencing of PrewriteState events and ByteEvents for debug std::chrono::steady_clock::time_point ts = { std::chrono::steady_clock::now()}; }; /** * Request that can be generated by observer in response to prewrite event. * * An observer can use a PrewriteRequest to request WriteFlags to be added * to a write and/or to request that the write be split up, both of which * can be used for timestamping. */ struct PrewriteRequest { // offset to split write at; may be split at earlier offset by another req folly::Optional maybeOffsetToSplitWrite; // write flags to be added if write split at requested offset WriteFlags writeFlagsToAddAtOffset{WriteFlags::NONE}; // write flags to be added regardless of where write happens WriteFlags writeFlagsToAdd{WriteFlags::NONE}; }; /** * Container of PrewriteRequests passed on invocation of prewrite(). * * If an observer wants to make a PrewriteRequest, it adds its request to this * container. Each added request is merged into a PrewriteRequest that can be * fetched via getMergedRequest(). */ class PrewriteRequestContainer { public: explicit PrewriteRequestContainer( const AsyncSocketObserverInterface::PrewriteState& prewriteState) : prewriteState_(prewriteState) {} /** * Add a PrewriteRequest to the container. */ void addRequest( const AsyncSocketObserverInterface::PrewriteRequest& request) { mergedRequest_.writeFlagsToAdd |= request.writeFlagsToAdd; if (request.maybeOffsetToSplitWrite.has_value()) { CHECK_GE( prewriteState_.endOffset, request.maybeOffsetToSplitWrite.value()); if ( // case 1: offset not set in merged request !mergedRequest_.maybeOffsetToSplitWrite.has_value() || // case 2: offset in merged request > offset in current request mergedRequest_.maybeOffsetToSplitWrite > request.maybeOffsetToSplitWrite) { mergedRequest_.maybeOffsetToSplitWrite = request.maybeOffsetToSplitWrite; // update mergedRequest_.writeFlagsToAddAtOffset = request.writeFlagsToAddAtOffset; // reset } else if ( // case 3: offset in merged request == offset in current request request.maybeOffsetToSplitWrite == mergedRequest_.maybeOffsetToSplitWrite) { mergedRequest_.writeFlagsToAddAtOffset |= request.writeFlagsToAddAtOffset; // merge } // case 4: offset in merged request < offset in current request // (do nothing) } // if maybeOffsetToSplitWrite points to end of the vector, remove the // split if (mergedRequest_.maybeOffsetToSplitWrite.has_value() && // explicit mergedRequest_.maybeOffsetToSplitWrite == prewriteState_.endOffset) { mergedRequest_.maybeOffsetToSplitWrite.reset(); // no split needed } } /** * Returns the merged PrewriteREquest representing action to take. * * The merged request has the split point at the earliest offset requested * across all requests, and has flags set to be the union of all timestamp * flags requested across all requests. * * Examples: * * - If there are two PrewriteRequests, one requesting we split on byte * offset 20, and the other requesting a split on byte offset 30, then we * will split on offset 20, as this is the earlier of the offsets. * * - If there are two PrewriteRequests, one requesting that we add the TX * and ACK flags, and the other requesting just the SCHED flag, then we * will add the TX, ACK, and SCHED flags to the request. */ FOLLY_NODISCARD const PrewriteRequest& getMergedRequest() const { return mergedRequest_; } private: const PrewriteState& prewriteState_; PrewriteRequest mergedRequest_; }; /** * Structure used to communicate ByteEvents, such as TX and ACK timestamps. */ struct ByteEvent { // types of events; start from 0 to enable indexing in arrays enum Type : uint8_t { WRITE = 0, SCHED = 1, TX = 2, ACK = 3, }; // type Type type; // offset of corresponding byte in raw byte stream size_t offset{0}; // socket timestamp, as recorded by AsyncSocket implementation std::chrono::steady_clock::time_point ts = { std::chrono::steady_clock::now()}; // kernel software timestamp for non-WRITE; for Linux this is CLOCK_REALTIME // see https://www.kernel.org/doc/Documentation/networking/timestamping.txt folly::Optional maybeSoftwareTs; // hardware timestamp for non-WRITE events; see kernel documentation // see https://www.kernel.org/doc/Documentation/networking/timestamping.txt folly::Optional maybeHardwareTs; // for WRITE events, the number of raw bytes written to the socket // optional to prevent accidental misuse in other event types folly::Optional maybeRawBytesWritten; // for WRITE events, the number of raw bytes we tried to write to the socket // optional to prevent accidental misuse in other event types folly::Optional maybeRawBytesTriedToWrite; // for WRITE ByteEvents, additional WriteFlags passed // optional to prevent accidental misuse in other event types folly::Optional maybeWriteFlags; /** * For WRITE events, returns if SCHED timestamp requested. */ FOLLY_NODISCARD bool schedTimestampRequestedOnWrite() const { CHECK_EQ(Type::WRITE, type); CHECK(maybeWriteFlags.has_value()); return isSet(*maybeWriteFlags, WriteFlags::TIMESTAMP_SCHED); } /** * For WRITE events, returns if TX timestamp requested. */ FOLLY_NODISCARD bool txTimestampRequestedOnWrite() const { CHECK_EQ(Type::WRITE, type); CHECK(maybeWriteFlags.has_value()); return isSet(*maybeWriteFlags, WriteFlags::TIMESTAMP_TX); } /** * For WRITE events, returns if ACK timestamp requested. */ FOLLY_NODISCARD bool ackTimestampRequestedOnWrite() const { CHECK_EQ(Type::WRITE, type); CHECK(maybeWriteFlags.has_value()); return isSet(*maybeWriteFlags, WriteFlags::TIMESTAMP_ACK); } }; /** * close() will be invoked when the socket is being closed. * * Can be called multiple times during shutdown / destruction for the same * socket. Observers may detach after first call or track if event * previously observed. * * @param socket Socket being closed. */ virtual void close(AsyncSocket* /* socket */) noexcept {} /** * connectAttempt() will be invoked when connect() is called. * * Triggered before any application connection callback. * * @param socket Socket that attempts to connect. */ virtual void connectAttempt(AsyncSocket* /* socket */) noexcept {} /** * connectSuccess() will be invoked when connect() returns successfully. * * Triggered before any application connection callback. * * @param socket Socket that has connected. */ virtual void connectSuccess(AsyncSocket* /* socket */) noexcept {} /** * connectError() will be invoked when connect() returns an error. * * Triggered before any application connection callback. * * @param socket Socket that has connected. * @param ex Exception that describes why. */ virtual void connectError( AsyncSocket* /* socket */, const AsyncSocketException& /* ex */) noexcept {} /** * Invoked when the socket is being attached to an EventBase. * * Called from within the EventBase thread being attached. * * @param socket Socket with EventBase change. * @param evb The EventBase being attached. */ virtual void evbAttach(AsyncSocket* /* socket */, EventBase* /* evb */) {} /** * Invoked when the socket is being detached from an EventBase. * * Called from within the EventBase thread being detached. * * @param socket Socket with EventBase change. * @param evb The EventBase that is being detached. */ virtual void evbDetach(AsyncSocket* /* socket */, EventBase* /* evb */) {} /** * Invoked each time a ByteEvent is available. * * Multiple ByteEvent may be generated for the same byte offset and event. * For instance, kernel software and hardware TX timestamps for the same * are delivered in separate CMsg, and thus will result in separate * ByteEvent. * * @param socket Socket that ByteEvent is available for. * @param event ByteEvent (WRITE, SCHED, TX, ACK). */ virtual void byteEvent( AsyncSocket* /* socket */, const ByteEvent& /* event */) noexcept {} /** * Invoked if ByteEvents are enabled. * * Only called if the observer's configuration requested ByteEvents. May * be invoked multiple times if ByteEvent configuration changes (i.e., if * ByteEvents are enabled without hardware timestamps, and then enabled * with them). * * @param socket Socket that ByteEvents are enabled for. */ virtual void byteEventsEnabled(AsyncSocket* /* socket */) noexcept {} /** * Invoked if ByteEvents could not be enabled, or if an error occurred that * will prevent further delivery of ByteEvents. * * An observer may be waiting to receive a ByteEvent, such as an ACK event * confirming delivery of the last byte of a payload, before closing the * socket. If the socket has become unhealthy then this ByteEvent may * never occur, yet the handler may be unaware that the socket is * unhealthy if reads have been shutdown and no writes are occurring; this * observer signal breaks this 'deadlock'. * * @param socket Socket that ByteEvents are now unavailable for. * @param ex Details on why ByteEvents are now unavailable. */ virtual void byteEventsUnavailable( AsyncSocket* /* socket */, const AsyncSocketException& /* ex */) noexcept {} /** * Invoked before each write to the socket if prewrite support enabled. * * The observer receives information about the pending write in the * PrewriteState and can request ByteEvents / socket timestamps by returning * a PrewriteRequest. The request contains the offset to split the write at * (if any) and WriteFlags to apply. * * PrewriteRequests are aggregated across observers. The write buffer is * split at the lowest offset returned by all observers. Flags are applied * based on configuration within the PrewriteRequest. Requests are not * sticky and expire after each write. * * Fewer bytes may be written than indicated in the PrewriteState or in the * PrewriteRequest split if the underlying socket / kernel * blocks on write. * * @param socket Socket that ByteEvents are now unavailable for. * @param state Pending write start and end offsets and flags. * @param container Container of PrewriteRequests that observer can add to. */ virtual void prewrite( AsyncSocket* /* transport */, const PrewriteState& /* state */, PrewriteRequestContainer& /* container */) { folly::terminate_with( "prewrite() called but not defined"); } /** * fdDetach() is invoked if the socket file descriptor is detached. * * detachNetworkSocket() will be triggered when a new AsyncSocket is being * constructed from an old one. See the moved() event for details about * this special case. * * @param socket Socket for which detachNetworkSocket was invoked. */ virtual void fdDetach(AsyncSocket* /* socket */) noexcept {} /** * fdAttach() is invoked when the socket file descriptor is attached. * * @param socket Socket for which handleNetworkSocketAttached was * invoked. */ virtual void fdAttach(AsyncSocket* /* socket */) noexcept {} /** * move() will be invoked when a new AsyncSocket is being constructed via * constructor AsyncSocket(AsyncSocket* oldAsyncSocket) from an AsyncSocket * that has an observer attached. * * This type of construction is common during TLS/SSL accept process. * wangle::Acceptor may transform an AsyncSocket to an AsyncFizzServer, and * then transform the AsyncFizzServer to an AsyncSSLSocket on fallback. * AsyncFizzServer and AsyncSSLSocket derive from AsyncSocket and at each * stage the aforementioned constructor will be called. * * Observers may be attached when the initial AsyncSocket is created, before * TLS/SSL accept handling has completed. As a result, AsyncSocket must * notify the observer during each transformation so that: * (1) The observer can track these transformations for debugging. * (2) The observer does not become separated from the underlying * operating system socket and corresponding file descriptor. * * When a new AsyncSocket is being constructed via the aforementioned * constructor, the following observer events will be triggered: * (1) fdDetach * (2) move * * When move is triggered, the observer can CHOOSE to detach the old socket * and attach to the new socket. This process will not happen automatically; * the observer must explicitly perform these steps. * * @param oldSocket Old socket that fd was detached from. * @param newSocket New socket being constructed with fd attached. */ virtual void move( AsyncSocket* /* oldSocket */, AsyncSocket* /* newSocket */) noexcept {} }; } // namespace folly