|
import asyncio
|
|
import datetime
|
|
import logging
|
|
import queue
|
|
import random
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Callable, Dict, List, Optional, Set
|
|
|
|
from av.frame import Frame
|
|
|
|
from . import clock
|
|
from .codecs import depayload, get_capabilities, get_decoder, is_rtx
|
|
from .exceptions import InvalidStateError
|
|
from .jitterbuffer import JitterBuffer
|
|
from .mediastreams import MediaStreamError, MediaStreamTrack
|
|
from .rate import RemoteBitrateEstimator
|
|
from .rtcdtlstransport import RTCDtlsTransport
|
|
from .rtcrtpparameters import (
|
|
RTCRtpCapabilities,
|
|
RTCRtpCodecParameters,
|
|
RTCRtpReceiveParameters,
|
|
)
|
|
from .rtp import (
|
|
RTCP_PSFB_APP,
|
|
RTCP_PSFB_PLI,
|
|
RTCP_RTPFB_NACK,
|
|
RTP_HISTORY_SIZE,
|
|
AnyRtcpPacket,
|
|
RtcpByePacket,
|
|
RtcpPsfbPacket,
|
|
RtcpReceiverInfo,
|
|
RtcpRrPacket,
|
|
RtcpRtpfbPacket,
|
|
RtcpSrPacket,
|
|
RtpPacket,
|
|
clamp_packets_lost,
|
|
pack_remb_fci,
|
|
unwrap_rtx,
|
|
)
|
|
from .stats import (
|
|
RTCInboundRtpStreamStats,
|
|
RTCRemoteOutboundRtpStreamStats,
|
|
RTCStatsReport,
|
|
)
|
|
from .utils import uint16_add, uint16_gt
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def decoder_worker(loop, input_q, output_q):
|
|
codec_name = None
|
|
decoder = None
|
|
|
|
while True:
|
|
task = input_q.get()
|
|
if task is None:
|
|
|
|
asyncio.run_coroutine_threadsafe(output_q.put(None), loop)
|
|
break
|
|
codec, encoded_frame = task
|
|
|
|
if codec.name != codec_name:
|
|
decoder = get_decoder(codec)
|
|
codec_name = codec.name
|
|
|
|
for frame in decoder.decode(encoded_frame):
|
|
|
|
asyncio.run_coroutine_threadsafe(output_q.put(frame), loop)
|
|
|
|
if decoder is not None:
|
|
del decoder
|
|
|
|
|
|
class NackGenerator:
|
|
def __init__(self) -> None:
|
|
self.max_seq: Optional[int] = None
|
|
self.missing: Set[int] = set()
|
|
|
|
def add(self, packet: RtpPacket) -> bool:
|
|
"""
|
|
Mark a new packet as received, and deduce missing packets.
|
|
"""
|
|
missed = False
|
|
|
|
if self.max_seq is None:
|
|
self.max_seq = packet.sequence_number
|
|
return missed
|
|
|
|
|
|
if uint16_gt(packet.sequence_number, self.max_seq):
|
|
seq = uint16_add(self.max_seq, 1)
|
|
while uint16_gt(packet.sequence_number, seq):
|
|
self.missing.add(seq)
|
|
missed = True
|
|
seq = uint16_add(seq, 1)
|
|
self.max_seq = packet.sequence_number
|
|
else:
|
|
self.missing.discard(packet.sequence_number)
|
|
|
|
|
|
self.truncate()
|
|
|
|
return missed
|
|
|
|
def truncate(self) -> None:
|
|
"""
|
|
Limit the number of missing packets we track.
|
|
|
|
Otherwise, the size of RTCP FB messages grows indefinitely.
|
|
"""
|
|
if self.max_seq is not None:
|
|
min_seq = uint16_add(self.max_seq, -RTP_HISTORY_SIZE)
|
|
for seq in list(self.missing):
|
|
if uint16_gt(min_seq, seq):
|
|
self.missing.discard(seq)
|
|
|
|
|
|
class StreamStatistics:
|
|
def __init__(self, clockrate: int) -> None:
|
|
self.base_seq: Optional[int] = None
|
|
self.max_seq: Optional[int] = None
|
|
self.cycles = 0
|
|
self.packets_received = 0
|
|
|
|
|
|
self._clockrate = clockrate
|
|
self._jitter_q4 = 0
|
|
self._last_arrival: Optional[int] = None
|
|
self._last_timestamp: Optional[int] = None
|
|
|
|
|
|
self._expected_prior = 0
|
|
self._received_prior = 0
|
|
|
|
def add(self, packet: RtpPacket) -> None:
|
|
in_order = self.max_seq is None or uint16_gt(
|
|
packet.sequence_number, self.max_seq
|
|
)
|
|
self.packets_received += 1
|
|
|
|
if self.base_seq is None:
|
|
self.base_seq = packet.sequence_number
|
|
|
|
if in_order:
|
|
arrival = int(time.time() * self._clockrate)
|
|
|
|
if self.max_seq is not None and packet.sequence_number < self.max_seq:
|
|
self.cycles += 1 << 16
|
|
self.max_seq = packet.sequence_number
|
|
|
|
if packet.timestamp != self._last_timestamp and self.packets_received > 1:
|
|
diff = abs(
|
|
(arrival - self._last_arrival)
|
|
- (packet.timestamp - self._last_timestamp)
|
|
)
|
|
self._jitter_q4 += diff - ((self._jitter_q4 + 8) >> 4)
|
|
|
|
self._last_arrival = arrival
|
|
self._last_timestamp = packet.timestamp
|
|
|
|
@property
|
|
def fraction_lost(self) -> int:
|
|
expected_interval = self.packets_expected - self._expected_prior
|
|
self._expected_prior = self.packets_expected
|
|
received_interval = self.packets_received - self._received_prior
|
|
self._received_prior = self.packets_received
|
|
lost_interval = expected_interval - received_interval
|
|
if expected_interval == 0 or lost_interval <= 0:
|
|
return 0
|
|
else:
|
|
return (lost_interval << 8) // expected_interval
|
|
|
|
@property
|
|
def jitter(self) -> int:
|
|
return self._jitter_q4 >> 4
|
|
|
|
@property
|
|
def packets_expected(self) -> int:
|
|
return self.cycles + self.max_seq - self.base_seq + 1
|
|
|
|
@property
|
|
def packets_lost(self) -> int:
|
|
return clamp_packets_lost(self.packets_expected - self.packets_received)
|
|
|
|
|
|
class RemoteStreamTrack(MediaStreamTrack):
|
|
def __init__(self, kind: str, id: Optional[str] = None) -> None:
|
|
super().__init__()
|
|
self.kind = kind
|
|
if id is not None:
|
|
self._id = id
|
|
self._queue: asyncio.Queue = asyncio.Queue()
|
|
|
|
async def recv(self) -> Frame:
|
|
"""
|
|
Receive the next frame.
|
|
"""
|
|
if self.readyState != "live":
|
|
raise MediaStreamError
|
|
|
|
frame = await self._queue.get()
|
|
if frame is None:
|
|
self.stop()
|
|
raise MediaStreamError
|
|
return frame
|
|
|
|
|
|
class TimestampMapper:
|
|
def __init__(self) -> None:
|
|
self._last: Optional[int] = None
|
|
self._origin: Optional[int] = None
|
|
|
|
def map(self, timestamp: int) -> int:
|
|
if self._origin is None:
|
|
|
|
self._origin = timestamp
|
|
elif timestamp < self._last:
|
|
|
|
self._origin -= 1 << 32
|
|
|
|
self._last = timestamp
|
|
return timestamp - self._origin
|
|
|
|
|
|
@dataclass
|
|
class RTCRtpContributingSource:
|
|
"""
|
|
The :class:`RTCRtpContributingSource` dictionary contains information about
|
|
a contributing source (CSRC).
|
|
"""
|
|
|
|
timestamp: datetime.datetime
|
|
"The timestamp associated with this source."
|
|
source: int
|
|
"The CSRC identifier associated with this source."
|
|
|
|
|
|
@dataclass
|
|
class RTCRtpSynchronizationSource:
|
|
"""
|
|
The :class:`RTCRtpSynchronizationSource` dictionary contains information about
|
|
a synchronization source (SSRC).
|
|
"""
|
|
|
|
timestamp: datetime.datetime
|
|
"The timestamp associated with this source."
|
|
source: int
|
|
"The SSRC identifier associated with this source."
|
|
|
|
|
|
class RTCRtpReceiver:
|
|
"""
|
|
The :class:`RTCRtpReceiver` interface manages the reception and decoding
|
|
of data for a :class:`MediaStreamTrack`.
|
|
|
|
:param kind: The kind of media (`'audio'` or `'video'`).
|
|
:param transport: An :class:`RTCDtlsTransport`.
|
|
"""
|
|
|
|
def __init__(self, kind: str, transport: RTCDtlsTransport) -> None:
|
|
if transport.state == "closed":
|
|
raise InvalidStateError
|
|
|
|
self._enabled = True
|
|
self.__active_ssrc: Dict[int, datetime.datetime] = {}
|
|
self.__codecs: Dict[int, RTCRtpCodecParameters] = {}
|
|
self.__decoder_queue: queue.Queue = queue.Queue()
|
|
self.__decoder_thread: Optional[threading.Thread] = None
|
|
self.__kind = kind
|
|
if kind == "audio":
|
|
self.__jitter_buffer = JitterBuffer(capacity=16, prefetch=4)
|
|
self.__nack_generator = None
|
|
self.__remote_bitrate_estimator = None
|
|
else:
|
|
self.__jitter_buffer = JitterBuffer(capacity=128, is_video=True)
|
|
self.__nack_generator = NackGenerator()
|
|
self.__remote_bitrate_estimator = RemoteBitrateEstimator()
|
|
self._track: Optional[RemoteStreamTrack] = None
|
|
self.__rtcp_exited = asyncio.Event()
|
|
self.__rtcp_started = asyncio.Event()
|
|
self.__rtcp_task: Optional[asyncio.Future[None]] = None
|
|
self.__rtx_ssrc: Dict[int, int] = {}
|
|
self.__started = False
|
|
self.__stats = RTCStatsReport()
|
|
self.__timestamp_mapper = TimestampMapper()
|
|
self.__transport = transport
|
|
|
|
|
|
self.__lsr: Dict[int, int] = {}
|
|
self.__lsr_time: Dict[int, float] = {}
|
|
self.__remote_streams: Dict[int, StreamStatistics] = {}
|
|
self.__rtcp_ssrc: Optional[int] = None
|
|
|
|
|
|
self.__log_debug: Callable[..., None] = lambda *args: None
|
|
if logger.isEnabledFor(logging.DEBUG):
|
|
self.__log_debug = lambda msg, *args: logger.debug(
|
|
f"RTCRtpReceiver(%s) {msg}", self.__kind, *args
|
|
)
|
|
|
|
@property
|
|
def track(self) -> MediaStreamTrack:
|
|
"""
|
|
The :class:`MediaStreamTrack` which is being handled by the receiver.
|
|
"""
|
|
return self._track
|
|
|
|
@property
|
|
def transport(self) -> RTCDtlsTransport:
|
|
"""
|
|
The :class:`RTCDtlsTransport` over which the media for the receiver's
|
|
track is received.
|
|
"""
|
|
return self.__transport
|
|
|
|
@classmethod
|
|
def getCapabilities(self, kind) -> Optional[RTCRtpCapabilities]:
|
|
"""
|
|
Returns the most optimistic view of the system's capabilities for
|
|
receiving media of the given `kind`.
|
|
|
|
:rtype: :class:`RTCRtpCapabilities`
|
|
"""
|
|
return get_capabilities(kind)
|
|
|
|
async def getStats(self) -> RTCStatsReport:
|
|
"""
|
|
Returns statistics about the RTP receiver.
|
|
|
|
:rtype: :class:`RTCStatsReport`
|
|
"""
|
|
for ssrc, stream in self.__remote_streams.items():
|
|
self.__stats.add(
|
|
RTCInboundRtpStreamStats(
|
|
|
|
timestamp=clock.current_datetime(),
|
|
type="inbound-rtp",
|
|
id="inbound-rtp_" + str(id(self)),
|
|
|
|
ssrc=ssrc,
|
|
kind=self.__kind,
|
|
transportId=self.transport._stats_id,
|
|
|
|
packetsReceived=stream.packets_received,
|
|
packetsLost=stream.packets_lost,
|
|
jitter=stream.jitter,
|
|
|
|
)
|
|
)
|
|
self.__stats.update(self.transport._get_stats())
|
|
|
|
return self.__stats
|
|
|
|
def getSynchronizationSources(self) -> List[RTCRtpSynchronizationSource]:
|
|
"""
|
|
Returns a :class:`RTCRtpSynchronizationSource` for each unique SSRC identifier
|
|
received in the last 10 seconds.
|
|
"""
|
|
cutoff = clock.current_datetime() - datetime.timedelta(seconds=10)
|
|
sources = []
|
|
for source, timestamp in self.__active_ssrc.items():
|
|
if timestamp >= cutoff:
|
|
sources.append(
|
|
RTCRtpSynchronizationSource(source=source, timestamp=timestamp)
|
|
)
|
|
return sources
|
|
|
|
async def receive(self, parameters: RTCRtpReceiveParameters) -> None:
|
|
"""
|
|
Attempt to set the parameters controlling the receiving of media.
|
|
|
|
:param parameters: The :class:`RTCRtpParameters` for the receiver.
|
|
"""
|
|
if not self.__started:
|
|
for codec in parameters.codecs:
|
|
self.__codecs[codec.payloadType] = codec
|
|
for encoding in parameters.encodings:
|
|
if encoding.rtx:
|
|
self.__rtx_ssrc[encoding.rtx.ssrc] = encoding.ssrc
|
|
|
|
|
|
self.__decoder_thread = threading.Thread(
|
|
target=decoder_worker,
|
|
name=self.__kind + "-decoder",
|
|
args=(
|
|
asyncio.get_event_loop(),
|
|
self.__decoder_queue,
|
|
self._track._queue,
|
|
),
|
|
)
|
|
self.__decoder_thread.start()
|
|
|
|
self.__transport._register_rtp_receiver(self, parameters)
|
|
self.__rtcp_task = asyncio.ensure_future(self._run_rtcp())
|
|
self.__started = True
|
|
|
|
def setTransport(self, transport: RTCDtlsTransport) -> None:
|
|
self.__transport = transport
|
|
|
|
async def stop(self) -> None:
|
|
"""
|
|
Irreversibly stop the receiver.
|
|
"""
|
|
if self.__started:
|
|
self.__transport._unregister_rtp_receiver(self)
|
|
self.__stop_decoder()
|
|
|
|
|
|
await self.__rtcp_started.wait()
|
|
self.__rtcp_task.cancel()
|
|
await self.__rtcp_exited.wait()
|
|
|
|
def _handle_disconnect(self) -> None:
|
|
self.__stop_decoder()
|
|
|
|
async def _handle_rtcp_packet(self, packet: AnyRtcpPacket) -> None:
|
|
self.__log_debug("< %s", packet)
|
|
|
|
if isinstance(packet, RtcpSrPacket):
|
|
self.__stats.add(
|
|
RTCRemoteOutboundRtpStreamStats(
|
|
|
|
timestamp=clock.current_datetime(),
|
|
type="remote-outbound-rtp",
|
|
id=f"remote-outbound-rtp_{id(self)}",
|
|
|
|
ssrc=packet.ssrc,
|
|
kind=self.__kind,
|
|
transportId=self.transport._stats_id,
|
|
|
|
packetsSent=packet.sender_info.packet_count,
|
|
bytesSent=packet.sender_info.octet_count,
|
|
|
|
remoteTimestamp=clock.datetime_from_ntp(
|
|
packet.sender_info.ntp_timestamp
|
|
),
|
|
)
|
|
)
|
|
self.__lsr[packet.ssrc] = (
|
|
(packet.sender_info.ntp_timestamp) >> 16
|
|
) & 0xFFFFFFFF
|
|
self.__lsr_time[packet.ssrc] = time.time()
|
|
elif isinstance(packet, RtcpByePacket):
|
|
self.__stop_decoder()
|
|
|
|
async def _handle_rtp_packet(self, packet: RtpPacket, arrival_time_ms: int) -> None:
|
|
"""
|
|
Handle an incoming RTP packet.
|
|
"""
|
|
self.__log_debug("< %s", packet)
|
|
|
|
|
|
if not self._enabled:
|
|
return
|
|
|
|
|
|
if self.__remote_bitrate_estimator is not None:
|
|
if packet.extensions.abs_send_time is not None:
|
|
remb = self.__remote_bitrate_estimator.add(
|
|
abs_send_time=packet.extensions.abs_send_time,
|
|
arrival_time_ms=arrival_time_ms,
|
|
payload_size=len(packet.payload) + packet.padding_size,
|
|
ssrc=packet.ssrc,
|
|
)
|
|
if self.__rtcp_ssrc is not None and remb is not None:
|
|
|
|
rtcp_packet = RtcpPsfbPacket(
|
|
fmt=RTCP_PSFB_APP,
|
|
ssrc=self.__rtcp_ssrc,
|
|
media_ssrc=0,
|
|
fci=pack_remb_fci(*remb),
|
|
)
|
|
await self._send_rtcp(rtcp_packet)
|
|
|
|
|
|
self.__active_ssrc[packet.ssrc] = clock.current_datetime()
|
|
|
|
|
|
codec = self.__codecs.get(packet.payload_type)
|
|
if codec is None:
|
|
self.__log_debug(
|
|
"x RTP packet with unknown payload type %d", packet.payload_type
|
|
)
|
|
return
|
|
|
|
|
|
if packet.ssrc not in self.__remote_streams:
|
|
self.__remote_streams[packet.ssrc] = StreamStatistics(codec.clockRate)
|
|
self.__remote_streams[packet.ssrc].add(packet)
|
|
|
|
|
|
if is_rtx(codec):
|
|
original_ssrc = self.__rtx_ssrc.get(packet.ssrc)
|
|
if original_ssrc is None:
|
|
self.__log_debug("x RTX packet from unknown SSRC %d", packet.ssrc)
|
|
return
|
|
|
|
apt = codec.parameters.get("apt")
|
|
if (
|
|
len(packet.payload) < 2
|
|
or not isinstance(apt, int)
|
|
or apt not in self.__codecs
|
|
):
|
|
return
|
|
|
|
packet = unwrap_rtx(packet, payload_type=apt, ssrc=original_ssrc)
|
|
|
|
|
|
if self.__nack_generator is not None and self.__nack_generator.add(packet):
|
|
await self._send_rtcp_nack(
|
|
packet.ssrc, sorted(self.__nack_generator.missing)
|
|
)
|
|
|
|
|
|
try:
|
|
if packet.payload:
|
|
packet._data = depayload(codec, packet.payload)
|
|
else:
|
|
packet._data = b""
|
|
except ValueError as exc:
|
|
self.__log_debug("x RTP payload parsing failed: %s", exc)
|
|
return
|
|
|
|
|
|
pli_flag, encoded_frame = self.__jitter_buffer.add(packet)
|
|
|
|
if pli_flag:
|
|
await self._send_rtcp_pli(packet.ssrc)
|
|
|
|
|
|
if encoded_frame is not None and self.__decoder_thread:
|
|
encoded_frame.timestamp = self.__timestamp_mapper.map(
|
|
encoded_frame.timestamp
|
|
)
|
|
self.__decoder_queue.put((codec, encoded_frame))
|
|
|
|
async def _run_rtcp(self) -> None:
|
|
self.__log_debug("- RTCP started")
|
|
self.__rtcp_started.set()
|
|
|
|
try:
|
|
while True:
|
|
|
|
|
|
await asyncio.sleep(0.5 + random.random())
|
|
|
|
|
|
reports = []
|
|
for ssrc, stream in self.__remote_streams.items():
|
|
lsr = 0
|
|
dlsr = 0
|
|
if ssrc in self.__lsr:
|
|
lsr = self.__lsr[ssrc]
|
|
delay = time.time() - self.__lsr_time[ssrc]
|
|
if delay > 0 and delay < 65536:
|
|
dlsr = int(delay * 65536)
|
|
|
|
reports.append(
|
|
RtcpReceiverInfo(
|
|
ssrc=ssrc,
|
|
fraction_lost=stream.fraction_lost,
|
|
packets_lost=stream.packets_lost,
|
|
highest_sequence=stream.max_seq,
|
|
jitter=stream.jitter,
|
|
lsr=lsr,
|
|
dlsr=dlsr,
|
|
)
|
|
)
|
|
|
|
if self.__rtcp_ssrc is not None and reports:
|
|
packet = RtcpRrPacket(ssrc=self.__rtcp_ssrc, reports=reports)
|
|
await self._send_rtcp(packet)
|
|
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
self.__log_debug("- RTCP finished")
|
|
self.__rtcp_exited.set()
|
|
|
|
async def _send_rtcp(self, packet) -> None:
|
|
self.__log_debug("> %s", packet)
|
|
try:
|
|
await self.transport._send_rtp(bytes(packet))
|
|
except ConnectionError:
|
|
pass
|
|
|
|
async def _send_rtcp_nack(self, media_ssrc: int, lost: List[int]) -> None:
|
|
"""
|
|
Send an RTCP packet to report missing RTP packets.
|
|
"""
|
|
if self.__rtcp_ssrc is not None:
|
|
packet = RtcpRtpfbPacket(
|
|
fmt=RTCP_RTPFB_NACK, ssrc=self.__rtcp_ssrc, media_ssrc=media_ssrc
|
|
)
|
|
packet.lost = lost
|
|
await self._send_rtcp(packet)
|
|
|
|
async def _send_rtcp_pli(self, media_ssrc: int) -> None:
|
|
"""
|
|
Send an RTCP packet to report picture loss.
|
|
"""
|
|
if self.__rtcp_ssrc is not None:
|
|
packet = RtcpPsfbPacket(
|
|
fmt=RTCP_PSFB_PLI, ssrc=self.__rtcp_ssrc, media_ssrc=media_ssrc
|
|
)
|
|
await self._send_rtcp(packet)
|
|
|
|
def _set_rtcp_ssrc(self, ssrc: int) -> None:
|
|
self.__rtcp_ssrc = ssrc
|
|
|
|
def __stop_decoder(self) -> None:
|
|
"""
|
|
Stop the decoder thread, which will in turn stop the track.
|
|
"""
|
|
if self.__decoder_thread:
|
|
self.__decoder_queue.put(None)
|
|
self.__decoder_thread.join()
|
|
self.__decoder_thread = None
|
|
|