diff --git a/include/livekit/room.h b/include/livekit/room.h index 33839158..69d5490f 100644 --- a/include/livekit/room.h +++ b/include/livekit/room.h @@ -17,13 +17,17 @@ #pragma once #include +#include #include #include #include "livekit/data_stream.h" #include "livekit/e2ee.h" #include "livekit/ffi_handle.h" +#include "livekit/result.h" #include "livekit/room_event_types.h" +#include "livekit/session_stats_error.h" +#include "livekit/stats.h" #include "livekit/subscription_thread_dispatcher.h" #include "livekit/visibility.h" @@ -187,6 +191,21 @@ class LIVEKIT_API Room { /// Returns the current connection state of the room. ConnectionState connectionState() const; + /// Retrieve aggregated WebRTC stats for this room session. + /// + /// Behavior: + /// - If the room is not currently connected (no live FFI handle), resolves + /// immediately with a `GetSessionStatsErrorCode::NOT_CONNECTED` failure. + /// - Otherwise dispatches an async `get_session_stats` request to the Rust + /// FFI; the future resolves once the corresponding callback arrives. + /// - The future never throws — failures are surfaced as a typed + /// `GetSessionStatsError`. Inspect `Result::ok()` / `Result::error().code` + /// to branch on outcome. + /// + /// @return Future resolving with publisher + subscriber stats on success, + /// or a typed error code + message on failure. + std::future> getStats() const; + /* Register a handler for incoming text streams on a specific topic. * * When a remote participant opens a text stream with the given topic, diff --git a/include/livekit/session_stats_error.h b/include/livekit/session_stats_error.h new file mode 100644 index 00000000..1ca44ee5 --- /dev/null +++ b/include/livekit/session_stats_error.h @@ -0,0 +1,49 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace livekit { + +/// Categorical reason code for a failed `Room::getStats()` call. +enum class GetSessionStatsErrorCode : std::uint32_t { + /// Catch-all: the FFI returned an error message that does not map to a more + /// specific code. + UNKNOWN = 0, + /// The `Room` has no live FFI handle (never connected or already + /// disconnected). + NOT_CONNECTED = 1, + /// The FFI responded with an unexpected response shape (e.g. a missing + /// `get_session_stats` field on the synchronous response). + PROTOCOL_ERROR = 2, + /// The FFI threw an internal error while servicing the request (e.g. the + /// underlying Rust engine reported a failure). + INTERNAL = 3, +}; + +/// Typed error returned by `Room::getStats()`. +/// +/// Surfaces the error reason as a `GetSessionStatsErrorCode` plus an +/// implementation-defined message for diagnostics/logging. +struct GetSessionStatsError { + GetSessionStatsErrorCode code{GetSessionStatsErrorCode::UNKNOWN}; + std::string message; +}; + +} // namespace livekit diff --git a/include/livekit/stats.h b/include/livekit/stats.h index 84674d8e..225b5878 100644 --- a/include/livekit/stats.h +++ b/include/livekit/stats.h @@ -501,6 +501,21 @@ struct RtcStats { RtcStatsVariant stats; }; +/// Aggregated WebRTC stats for a connected room session. +/// +/// Mirrors the FFI `GetSessionStatsCallback.Result` payload: stats are split +/// between the publisher peer connection (outbound media flowing from the +/// local participant to the SFU) and the subscriber peer connection (inbound +/// media flowing from the SFU back to the local participant). When the SDK is +/// operating in single-peer-connection mode the publisher list carries the +/// combined stats and the subscriber list is empty. +struct SessionStats { + /// Stats from the publisher peer connection (outbound media). + std::vector publisher_stats; + /// Stats from the subscriber peer connection (inbound media). + std::vector subscriber_stats; +}; + // ---------------------- // fromProto declarations // ---------------------- diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 8105ffb7..8cc7d946 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -428,6 +428,73 @@ std::future> FfiClient::getTrackStatsAsync(uintptr_t track return fut; } +namespace { + +std::future> readySessionStatsFailure(GetSessionStatsErrorCode code, + std::string message) { + std::promise> pr; + pr.set_value(Result::failure(GetSessionStatsError{code, std::move(message)})); + return pr.get_future(); +} + +} // namespace + +std::future> FfiClient::getSessionStatsAsync(uintptr_t room_handle) { + const AsyncId async_id = generateAsyncId(); + + auto fut = registerAsync>( + async_id, + // match + [async_id](const proto::FfiEvent& event) { + return event.has_get_session_stats() && event.get_session_stats().async_id() == async_id; + }, + // handler + [](const proto::FfiEvent& event, std::promise>& pr) { + const auto& cb = event.get_session_stats(); + if (cb.has_error()) { + pr.set_value(Result::failure( + GetSessionStatsError{GetSessionStatsErrorCode::INTERNAL, cb.error()})); + return; + } + if (!cb.has_result()) { + pr.set_value(Result::failure(GetSessionStatsError{ + GetSessionStatsErrorCode::PROTOCOL_ERROR, "GetSessionStatsCallback missing result and error"})); + return; + } + + const auto& result = cb.result(); + SessionStats stats; + stats.publisher_stats.reserve(result.publisher_stats_size()); + for (const auto& ps : result.publisher_stats()) { + stats.publisher_stats.push_back(fromProto(ps)); + } + stats.subscriber_stats.reserve(result.subscriber_stats_size()); + for (const auto& ps : result.subscriber_stats()) { + stats.subscriber_stats.push_back(fromProto(ps)); + } + pr.set_value(Result::success(std::move(stats))); + }); + + proto::FfiRequest req; + auto* get_session_stats_req = req.mutable_get_session_stats(); + get_session_stats_req->set_room_handle(room_handle); + get_session_stats_req->set_request_async_id(async_id); + + try { + const proto::FfiResponse resp = sendRequest(req); + if (!resp.has_get_session_stats()) { + cancelPendingByAsyncId(async_id); + return readySessionStatsFailure(GetSessionStatsErrorCode::PROTOCOL_ERROR, + "FfiResponse missing get_session_stats"); + } + } catch (const std::exception& e) { + cancelPendingByAsyncId(async_id); + return readySessionStatsFailure(GetSessionStatsErrorCode::INTERNAL, e.what()); + } + + return fut; +} + // Participant APIs Implementation std::future FfiClient::publishTrackAsync(std::uint64_t local_participant_handle, std::uint64_t track_handle, diff --git a/src/ffi_client.h b/src/ffi_client.h index e66bb3d8..a865ad71 100644 --- a/src/ffi_client.h +++ b/src/ffi_client.h @@ -30,6 +30,7 @@ #include "data_track.pb.h" #include "livekit/data_track_error.h" #include "livekit/result.h" +#include "livekit/session_stats_error.h" #include "livekit/stats.h" #include "livekit/visibility.h" #include "lk_log.h" @@ -97,6 +98,9 @@ class LIVEKIT_INTERNAL_API FfiClient { // Track APIs std::future> getTrackStatsAsync(uintptr_t track_handle); + // Room APIs (stats) + std::future> getSessionStatsAsync(uintptr_t room_handle); + // Participant APIs std::future publishTrackAsync(std::uint64_t local_participant_handle, std::uint64_t track_handle, diff --git a/src/room.cpp b/src/room.cpp index ed258e62..7f7459aa 100644 --- a/src/room.cpp +++ b/src/room.cpp @@ -267,6 +267,21 @@ ConnectionState Room::connectionState() const { return connection_state_; } +std::future> Room::getStats() const { + std::shared_ptr handle; + { + const std::scoped_lock g(lock_); + handle = room_handle_; + } + if (!handle) { + std::promise> pr; + pr.set_value(Result::failure( + GetSessionStatsError{GetSessionStatsErrorCode::NOT_CONNECTED, "Room is not connected"})); + return pr.get_future(); + } + return FfiClient::instance().getSessionStatsAsync(handle->get()); +} + E2EEManager* Room::e2eeManager() const { const std::scoped_lock g(lock_); return e2ee_manager_.get(); diff --git a/src/tests/integration/test_session_stats.cpp b/src/tests/integration/test_session_stats.cpp new file mode 100644 index 00000000..03c5ce90 --- /dev/null +++ b/src/tests/integration/test_session_stats.cpp @@ -0,0 +1,210 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../common/audio_utils.h" +#include "../common/test_common.h" + +namespace livekit::test { + +using namespace std::chrono_literals; + +namespace { + +constexpr int kAudioSampleRate = kDefaultAudioSampleRate; +constexpr int kAudioChannels = kDefaultAudioChannels; + +/// Time to let media flow before sampling stats; below this the RTP counters +/// are typically empty and the printed output is uninteresting. +constexpr auto kStatsWarmup = 5s; + +const char* rtcStatsTypeName(const RtcStats& s) { + return std::visit( + [](const auto& v) -> const char* { + using T = std::decay_t; + if constexpr (std::is_same_v) { + return "Codec"; + } else if constexpr (std::is_same_v) { + return "InboundRtp"; + } else if constexpr (std::is_same_v) { + return "OutboundRtp"; + } else if constexpr (std::is_same_v) { + return "RemoteInboundRtp"; + } else if constexpr (std::is_same_v) { + return "RemoteOutboundRtp"; + } else if constexpr (std::is_same_v) { + return "MediaSource"; + } else if constexpr (std::is_same_v) { + return "MediaPlayout"; + } else if constexpr (std::is_same_v) { + return "PeerConnection"; + } else if constexpr (std::is_same_v) { + return "DataChannel"; + } else if constexpr (std::is_same_v) { + return "Transport"; + } else if constexpr (std::is_same_v) { + return "CandidatePair"; + } else if constexpr (std::is_same_v) { + return "LocalCandidate"; + } else if constexpr (std::is_same_v) { + return "RemoteCandidate"; + } else if constexpr (std::is_same_v) { + return "Certificate"; + } else if constexpr (std::is_same_v) { + return "Stream"; + } else { + return "Unknown"; + } + }, + s.stats); +} + +void dumpInterestingEntries(const std::vector& stats) { + for (const auto& stat : stats) { + std::visit( + [&](const auto& s) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + std::cout << " [OutboundRtp] id=" << s.rtc.id << " kind=" << s.stream.kind + << " packets_sent=" << s.sent.packets_sent << " bytes_sent=" << s.sent.bytes_sent + << " target_bitrate=" << std::fixed << std::setprecision(2) << s.outbound.target_bitrate + << std::endl; + } else if constexpr (std::is_same_v) { + std::cout << " [InboundRtp] id=" << s.rtc.id << " kind=" << s.stream.kind + << " packets_received=" << s.received.packets_received + << " packets_lost=" << s.received.packets_lost << " jitter=" << std::fixed << std::setprecision(6) + << s.received.jitter << " bytes_received=" << s.inbound.bytes_received << std::endl; + } else if constexpr (std::is_same_v) { + std::cout << " [CandidatePair] id=" << s.rtc.id << " rtt=" << std::fixed << std::setprecision(4) + << s.candidate_pair.current_round_trip_time << "s" + << " in_bitrate=" << s.candidate_pair.available_incoming_bitrate + << " out_bitrate=" << s.candidate_pair.available_outgoing_bitrate + << " bytes_sent=" << s.candidate_pair.bytes_sent + << " bytes_received=" << s.candidate_pair.bytes_received << std::endl; + } else if constexpr (std::is_same_v) { + std::cout << " [Transport] id=" << s.rtc.id << " packets_sent=" << s.transport.packets_sent + << " packets_received=" << s.transport.packets_received + << " bytes_sent=" << s.transport.bytes_sent << " bytes_received=" << s.transport.bytes_received + << std::endl; + } else if constexpr (std::is_same_v) { + std::cout << " [PeerConnection] id=" << s.rtc.id + << " data_channels_opened=" << s.pc.data_channels_opened + << " data_channels_closed=" << s.pc.data_channels_closed << std::endl; + } + }, + stat.stats); + } +} + +void printSide(const std::string& side_label, const std::vector& stats) { + std::cout << " " << side_label << " entries=" << stats.size(); + std::map type_counts; + for (const auto& s : stats) { + type_counts[rtcStatsTypeName(s)]++; + } + if (!type_counts.empty()) { + std::cout << " types:"; + for (const auto& kv : type_counts) { + std::cout << " " << kv.first << "=" << kv.second; + } + } + std::cout << std::endl; + dumpInterestingEntries(stats); +} + +void printSessionStats(const std::string& room_label, const SessionStats& stats) { + std::cout << "[SessionStats] " << room_label << ":" << std::endl; + printSide("publisher", stats.publisher_stats); + printSide("subscriber", stats.subscriber_stats); +} + +} // namespace + +class SessionStatsIntegrationTest : public LiveKitTestBase {}; + +TEST_F(SessionStatsIntegrationTest, PublishAudioThenFetchSessionStats) { + skipIfNotConfigured(); + + RoomOptions options; + options.auto_subscribe = true; + options.single_peer_connection = false; + + auto receiver_room = std::make_unique(); + ASSERT_TRUE(receiver_room->connect(config_.url, config_.token_b, options)) << "Receiver failed to connect"; + + auto sender_room = std::make_unique(); + ASSERT_TRUE(sender_room->connect(config_.url, config_.token_a, options)) << "Sender failed to connect"; + + auto source = std::make_shared(kAudioSampleRate, kAudioChannels, 0); + auto track = LocalAudioTrack::createLocalAudioTrack("session-stats-audio", source); + TrackPublishOptions opts; + opts.source = TrackSource::SOURCE_MICROPHONE; + sender_room->localParticipant()->publishTrack(track, opts); + std::cerr << "[SessionStats] published audio track sid=" << track->sid() << std::endl; + + std::atomic running{true}; + std::thread audio_thread([&]() { runToneLoop(source, running, /*base_freq_hz=*/440.0, /*siren_mode=*/false); }); + + std::this_thread::sleep_for(kStatsWarmup); + + auto sender_fut = sender_room->getStats(); + auto receiver_fut = receiver_room->getStats(); + + auto sender_result = sender_fut.get(); + auto receiver_result = receiver_fut.get(); + + running.store(false, std::memory_order_relaxed); + if (audio_thread.joinable()) { + audio_thread.join(); + } + if (track->publication()) { + sender_room->localParticipant()->unpublishTrack(track->publication()->sid()); + } + + ASSERT_TRUE(sender_result.ok()) << "Sender getStats failed: code=" << static_cast(sender_result.error().code) + << " msg=" << sender_result.error().message; + ASSERT_TRUE(receiver_result.ok()) << "Receiver getStats failed: code=" + << static_cast(receiver_result.error().code) + << " msg=" << receiver_result.error().message; + + printSessionStats("sender", sender_result.value()); + printSessionStats("receiver", receiver_result.value()); + + EXPECT_FALSE(sender_result.value().publisher_stats.empty()) << "Sender should have publisher stats"; + EXPECT_FALSE(receiver_result.value().subscriber_stats.empty()) << "Receiver should have subscriber stats"; +} + +TEST_F(SessionStatsIntegrationTest, NotConnectedReturnsNotConnected) { + Room room; + auto fut = room.getStats(); + auto result = fut.get(); + EXPECT_FALSE(result.ok()); + EXPECT_EQ(result.error().code, GetSessionStatsErrorCode::NOT_CONNECTED); + std::cerr << "[SessionStats] disconnected message: " << result.error().message << std::endl; +} + +} // namespace livekit::test diff --git a/src/tests/unit/test_ffi_client.cpp b/src/tests/unit/test_ffi_client.cpp index f7516462..9acc1515 100644 --- a/src/tests/unit/test_ffi_client.cpp +++ b/src/tests/unit/test_ffi_client.cpp @@ -221,6 +221,15 @@ TEST_F(FfiClientTest, NotInitialized_GetTrackStatsAsyncThrows) { EXPECT_THROW(FfiClient::instance().getTrackStatsAsync(1), std::runtime_error); } +TEST_F(FfiClientTest, NotInitialized_GetSessionStatsAsyncFails) { + ASSERT_FALSE(FfiClient::instance().isInitialized()); + + auto fut_result = FfiClient::instance().getSessionStatsAsync(1); + auto result = fut_result.get(); + EXPECT_FALSE(result.ok()); + EXPECT_EQ(result.error().code, GetSessionStatsErrorCode::INTERNAL); +} + TEST_F(FfiClientTest, NotInitialized_PublishDataTrackAsyncFails) { ASSERT_FALSE(FfiClient::instance().isInitialized());