|
import enum
|
|
import ipaddress
|
|
import re
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
|
|
|
from . import rtp
|
|
from .rtcdtlstransport import RTCDtlsFingerprint, RTCDtlsParameters
|
|
from .rtcicetransport import RTCIceCandidate, RTCIceParameters
|
|
from .rtcrtpparameters import (
|
|
ParametersDict,
|
|
RTCRtcpFeedback,
|
|
RTCRtpCodecParameters,
|
|
RTCRtpHeaderExtensionParameters,
|
|
RTCRtpParameters,
|
|
)
|
|
from .rtcsctptransport import RTCSctpCapabilities
|
|
|
|
DIRECTIONS = ["inactive", "sendonly", "recvonly", "sendrecv"]
|
|
|
|
DTLS_ROLE_SETUP = {"auto": "actpass", "client": "active", "server": "passive"}
|
|
DTLS_SETUP_ROLE = dict([(v, k) for (k, v) in DTLS_ROLE_SETUP.items()])
|
|
|
|
FMTP_INT_PARAMETERS = [
|
|
"apt",
|
|
"max-fr",
|
|
"max-fs",
|
|
"maxplaybackrate",
|
|
"minptime",
|
|
"stereo",
|
|
"useinbandfec",
|
|
]
|
|
|
|
|
|
class BitPattern:
|
|
def __init__(self, v: str) -> None:
|
|
self._mask = ~self._bytemaskstring("x", v)
|
|
self._masked_value = self._bytemaskstring("1", v)
|
|
|
|
def matches(self, v: int) -> bool:
|
|
return (v & self._mask) == self._masked_value
|
|
|
|
def _bytemaskstring(self, c: str, s: str) -> int:
|
|
return (
|
|
(s[0] == c) << 7
|
|
| (s[1] == c) << 6
|
|
| (s[2] == c) << 5
|
|
| (s[3] == c) << 4
|
|
| (s[4] == c) << 3
|
|
| (s[5] == c) << 2
|
|
| (s[6] == c) << 1
|
|
| (s[7] == c) << 0
|
|
)
|
|
|
|
|
|
class H264Profile(enum.Enum):
|
|
PROFILE_CONSTRAINED_BASELINE = 0
|
|
PROFILE_BASELINE = 1
|
|
PROFILE_MAIN = 2
|
|
PROFILE_CONSTRAINED_HIGH = 3
|
|
PROFILE_HIGH = 4
|
|
PROFILE_PREDICTIVE_HIGH_444 = 5
|
|
|
|
|
|
class H264Level(enum.IntEnum):
|
|
LEVEL1_B = -1
|
|
LEVEL1 = 10
|
|
LEVEL1_1 = 11
|
|
LEVEL1_2 = 12
|
|
LEVEL1_3 = 13
|
|
LEVEL2 = 20
|
|
LEVEL2_1 = 21
|
|
LEVEL2_2 = 22
|
|
LEVEL3 = 30
|
|
LEVEL3_1 = 31
|
|
LEVEL3_2 = 32
|
|
LEVEL4 = 40
|
|
LEVEL4_1 = 41
|
|
LEVEL4_2 = 42
|
|
LEVEL5 = 50
|
|
LEVEL5_1 = 51
|
|
LEVEL5_2 = 52
|
|
|
|
|
|
H264_PROFILE_PATTERNS = [
|
|
(0x42, BitPattern("x1xx0000"), H264Profile.PROFILE_CONSTRAINED_BASELINE),
|
|
(0x4D, BitPattern("1xxx0000"), H264Profile.PROFILE_CONSTRAINED_BASELINE),
|
|
(0x58, BitPattern("11xx0000"), H264Profile.PROFILE_CONSTRAINED_BASELINE),
|
|
(0x42, BitPattern("x0xx0000"), H264Profile.PROFILE_BASELINE),
|
|
(0x58, BitPattern("10xx0000"), H264Profile.PROFILE_BASELINE),
|
|
(0x4D, BitPattern("0x0x0000"), H264Profile.PROFILE_MAIN),
|
|
(0x64, BitPattern("00000000"), H264Profile.PROFILE_HIGH),
|
|
(0x64, BitPattern("00001100"), H264Profile.PROFILE_CONSTRAINED_HIGH),
|
|
(0xF4, BitPattern("00000000"), H264Profile.PROFILE_PREDICTIVE_HIGH_444),
|
|
]
|
|
|
|
|
|
def candidate_from_sdp(sdp: str) -> RTCIceCandidate:
|
|
bits = sdp.split()
|
|
assert len(bits) >= 8
|
|
|
|
candidate = RTCIceCandidate(
|
|
component=int(bits[1]),
|
|
foundation=bits[0],
|
|
ip=bits[4],
|
|
port=int(bits[5]),
|
|
priority=int(bits[3]),
|
|
protocol=bits[2],
|
|
type=bits[7],
|
|
)
|
|
|
|
for i in range(8, len(bits) - 1, 2):
|
|
if bits[i] == "raddr":
|
|
candidate.relatedAddress = bits[i + 1]
|
|
elif bits[i] == "rport":
|
|
candidate.relatedPort = int(bits[i + 1])
|
|
elif bits[i] == "tcptype":
|
|
candidate.tcpType = bits[i + 1]
|
|
|
|
return candidate
|
|
|
|
|
|
def candidate_to_sdp(candidate: RTCIceCandidate) -> str:
|
|
sdp = (
|
|
f"{candidate.foundation} {candidate.component} {candidate.protocol} "
|
|
f"{candidate.priority} {candidate.ip} {candidate.port} typ {candidate.type}"
|
|
)
|
|
|
|
if candidate.relatedAddress is not None:
|
|
sdp += f" raddr {candidate.relatedAddress}"
|
|
if candidate.relatedPort is not None:
|
|
sdp += f" rport {candidate.relatedPort}"
|
|
if candidate.tcpType is not None:
|
|
sdp += f" tcptype {candidate.tcpType}"
|
|
return sdp
|
|
|
|
|
|
def grouplines(sdp: str) -> Tuple[List[str], List[List[str]]]:
|
|
session = []
|
|
media = []
|
|
for line in sdp.splitlines():
|
|
if line.startswith("m="):
|
|
media.append([line])
|
|
elif len(media):
|
|
media[-1].append(line)
|
|
else:
|
|
session.append(line)
|
|
return session, media
|
|
|
|
|
|
def ipaddress_from_sdp(sdp: str) -> str:
|
|
m = re.match("^IN (IP4|IP6) ([^ ]+)$", sdp)
|
|
assert m
|
|
return m.group(2)
|
|
|
|
|
|
def ipaddress_to_sdp(addr: str) -> str:
|
|
version = ipaddress.ip_address(addr).version
|
|
return f"IN IP{version} {addr}"
|
|
|
|
|
|
def parameters_from_sdp(sdp: str) -> ParametersDict:
|
|
parameters: ParametersDict = {}
|
|
for param in sdp.split(";"):
|
|
if "=" in param:
|
|
k, v = param.split("=", 1)
|
|
if k in FMTP_INT_PARAMETERS:
|
|
parameters[k] = int(v)
|
|
else:
|
|
parameters[k] = v
|
|
else:
|
|
parameters[param] = None
|
|
return parameters
|
|
|
|
|
|
def parameters_to_sdp(parameters: ParametersDict) -> str:
|
|
params = []
|
|
for param_k, param_v in parameters.items():
|
|
if param_v is not None:
|
|
params.append(f"{param_k}={param_v}")
|
|
else:
|
|
params.append(param_k)
|
|
return ";".join(params)
|
|
|
|
|
|
def parse_attr(line: str) -> Tuple[str, Optional[str]]:
|
|
if ":" in line:
|
|
bits = line[2:].split(":", 1)
|
|
return bits[0], bits[1]
|
|
else:
|
|
return line[2:], None
|
|
|
|
|
|
def parse_h264_profile_level_id(profile_str: str) -> Tuple[H264Profile, H264Level]:
|
|
if not isinstance(profile_str, str) or not re.match(
|
|
"[0-9a-f]{6}", profile_str, re.I
|
|
):
|
|
raise ValueError("Expected a 6 character hexadecimal string")
|
|
|
|
level_idc = int(profile_str[4:6], 16)
|
|
profile_iop = int(profile_str[2:4], 16)
|
|
profile_idc = int(profile_str[0:2], 16)
|
|
|
|
level: H264Level
|
|
if level_idc == H264Level.LEVEL1_1:
|
|
level = H264Level.LEVEL1_B if (profile_iop & 0x10) else H264Level.LEVEL1_1
|
|
else:
|
|
level = H264Level(level_idc)
|
|
|
|
for idc, pattern, profile in H264_PROFILE_PATTERNS:
|
|
if idc == profile_idc and pattern.matches(profile_iop):
|
|
return profile, level
|
|
|
|
raise ValueError(
|
|
f"Unrecognized profile_iop = {profile_iop}, profile_idc = {profile_idc}"
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class GroupDescription:
|
|
semantic: str
|
|
items: List[Union[int, str]]
|
|
|
|
def __str__(self) -> str:
|
|
return f"{self.semantic} {' '.join(map(str, self.items))}"
|
|
|
|
|
|
def parse_group(dest: List[GroupDescription], value: str, type=str) -> None:
|
|
bits = value.split()
|
|
if bits:
|
|
dest.append(GroupDescription(semantic=bits[0], items=list(map(type, bits[1:]))))
|
|
|
|
|
|
@dataclass
|
|
class SsrcDescription:
|
|
ssrc: int
|
|
cname: Optional[str] = None
|
|
msid: Optional[str] = None
|
|
mslabel: Optional[str] = None
|
|
label: Optional[str] = None
|
|
|
|
|
|
SSRC_INFO_ATTRS = ["cname", "msid", "mslabel", "label"]
|
|
|
|
|
|
class MediaDescription:
|
|
def __init__(self, kind: str, port: int, profile: str, fmt: List[Any]) -> None:
|
|
|
|
self.kind = kind
|
|
self.port = port
|
|
self.host: Optional[str] = None
|
|
self.profile = profile
|
|
self.direction: Optional[str] = None
|
|
self.msid: Optional[str] = None
|
|
|
|
|
|
self.rtcp_port: Optional[int] = None
|
|
self.rtcp_host: Optional[str] = None
|
|
self.rtcp_mux = False
|
|
|
|
|
|
self.ssrc: List[SsrcDescription] = []
|
|
self.ssrc_group: List[GroupDescription] = []
|
|
|
|
|
|
self.fmt = fmt
|
|
self.rtp = RTCRtpParameters()
|
|
|
|
|
|
self.sctpCapabilities: Optional[RTCSctpCapabilities] = None
|
|
self.sctpmap: Dict[int, str] = {}
|
|
self.sctp_port: Optional[int] = None
|
|
|
|
|
|
self.dtls: Optional[RTCDtlsParameters] = None
|
|
|
|
|
|
self.ice: Optional[RTCIceParameters] = None
|
|
self.ice_candidates: List[RTCIceCandidate] = []
|
|
self.ice_candidates_complete = False
|
|
self.ice_options: Optional[str] = None
|
|
|
|
def __str__(self) -> str:
|
|
lines = []
|
|
lines.append(
|
|
f"m={self.kind} {self.port} {self.profile} {' '.join(map(str, self.fmt))}"
|
|
)
|
|
if self.host is not None:
|
|
lines.append(f"c={ipaddress_to_sdp(self.host)}")
|
|
if self.direction is not None:
|
|
lines.append(f"a={self.direction}")
|
|
|
|
for header in self.rtp.headerExtensions:
|
|
lines.append(f"a=extmap:{header.id} {header.uri}")
|
|
|
|
if self.rtp.muxId:
|
|
lines.append(f"a=mid:{self.rtp.muxId}")
|
|
|
|
if self.msid:
|
|
lines.append(f"a=msid:{self.msid}")
|
|
|
|
if self.rtcp_port is not None and self.rtcp_host is not None:
|
|
lines.append(f"a=rtcp:{self.rtcp_port} {ipaddress_to_sdp(self.rtcp_host)}")
|
|
if self.rtcp_mux:
|
|
lines.append("a=rtcp-mux")
|
|
|
|
for group in self.ssrc_group:
|
|
lines.append(f"a=ssrc-group:{group}")
|
|
for ssrc_info in self.ssrc:
|
|
for ssrc_attr in SSRC_INFO_ATTRS:
|
|
ssrc_value = getattr(ssrc_info, ssrc_attr)
|
|
if ssrc_value is not None:
|
|
lines.append(f"a=ssrc:{ssrc_info.ssrc} {ssrc_attr}:{ssrc_value}")
|
|
|
|
for codec in self.rtp.codecs:
|
|
lines.append(f"a=rtpmap:{codec.payloadType} {codec}")
|
|
|
|
|
|
for feedback in codec.rtcpFeedback:
|
|
value = feedback.type
|
|
if feedback.parameter:
|
|
value += f" {feedback.parameter}"
|
|
lines.append(f"a=rtcp-fb:{codec.payloadType} {value}")
|
|
|
|
|
|
params = parameters_to_sdp(codec.parameters)
|
|
if params:
|
|
lines.append(f"a=fmtp:{codec.payloadType} {params}")
|
|
|
|
for k, v in self.sctpmap.items():
|
|
lines.append(f"a=sctpmap:{k} {v}")
|
|
if self.sctp_port is not None:
|
|
lines.append(f"a=sctp-port:{self.sctp_port}")
|
|
if self.sctpCapabilities is not None:
|
|
lines.append(f"a=max-message-size:{self.sctpCapabilities.maxMessageSize}")
|
|
|
|
|
|
for candidate in self.ice_candidates:
|
|
lines.append("a=candidate:" + candidate_to_sdp(candidate))
|
|
if self.ice_candidates_complete:
|
|
lines.append("a=end-of-candidates")
|
|
if self.ice.usernameFragment is not None:
|
|
lines.append(f"a=ice-ufrag:{self.ice.usernameFragment}")
|
|
if self.ice.password is not None:
|
|
lines.append(f"a=ice-pwd:{self.ice.password}")
|
|
if self.ice_options is not None:
|
|
lines.append(f"a=ice-options:{self.ice_options}")
|
|
|
|
|
|
if self.dtls:
|
|
for fingerprint in self.dtls.fingerprints:
|
|
lines.append(
|
|
f"a=fingerprint:{fingerprint.algorithm} {fingerprint.value}"
|
|
)
|
|
lines.append(f"a=setup:{DTLS_ROLE_SETUP[self.dtls.role]}")
|
|
|
|
return "\r\n".join(lines) + "\r\n"
|
|
|
|
|
|
class SessionDescription:
|
|
def __init__(self) -> None:
|
|
self.version = 0
|
|
self.origin: Optional[str] = None
|
|
self.name = "-"
|
|
self.time = "0 0"
|
|
self.host: Optional[str] = None
|
|
self.group: List[GroupDescription] = []
|
|
self.msid_semantic: List[GroupDescription] = []
|
|
self.media: List[MediaDescription] = []
|
|
self.type: Optional[str] = None
|
|
|
|
@classmethod
|
|
def parse(cls, sdp: str):
|
|
current_media: Optional[MediaDescription] = None
|
|
dtls_fingerprints = []
|
|
dtls_role = None
|
|
ice_lite = False
|
|
ice_options = None
|
|
ice_password = None
|
|
ice_usernameFragment = None
|
|
|
|
def find_codec(pt: int) -> RTCRtpCodecParameters:
|
|
return next(filter(lambda x: x.payloadType == pt, current_media.rtp.codecs))
|
|
|
|
session_lines, media_groups = grouplines(sdp)
|
|
|
|
|
|
session = cls()
|
|
for line in session_lines:
|
|
if line.startswith("v="):
|
|
session.version = int(line.strip()[2:])
|
|
elif line.startswith("o="):
|
|
session.origin = line.strip()[2:]
|
|
elif line.startswith("s="):
|
|
session.name = line.strip()[2:]
|
|
elif line.startswith("c="):
|
|
session.host = ipaddress_from_sdp(line[2:])
|
|
elif line.startswith("t="):
|
|
session.time = line.strip()[2:]
|
|
elif line.startswith("a="):
|
|
attr, value = parse_attr(line)
|
|
if attr == "fingerprint":
|
|
algorithm, fingerprint = value.split()
|
|
dtls_fingerprints.append(
|
|
RTCDtlsFingerprint(algorithm=algorithm, value=fingerprint)
|
|
)
|
|
elif attr == "ice-lite":
|
|
ice_lite = True
|
|
elif attr == "ice-options":
|
|
ice_options = value
|
|
elif attr == "ice-pwd":
|
|
ice_password = value
|
|
elif attr == "ice-ufrag":
|
|
ice_usernameFragment = value
|
|
elif attr == "group":
|
|
parse_group(session.group, value)
|
|
elif attr == "msid-semantic":
|
|
parse_group(session.msid_semantic, value)
|
|
elif attr == "setup":
|
|
dtls_role = DTLS_SETUP_ROLE[value]
|
|
|
|
|
|
for media_lines in media_groups:
|
|
m = re.match("^m=([^ ]+) ([0-9]+) ([A-Z/]+) (.+)$", media_lines[0])
|
|
assert m
|
|
|
|
|
|
kind = m.group(1)
|
|
fmt = m.group(4).split()
|
|
fmt_int: Optional[List[int]] = None
|
|
if kind in ["audio", "video"]:
|
|
fmt_int = [int(x) for x in fmt]
|
|
for pt in fmt_int:
|
|
assert pt >= 0 and pt < 256
|
|
assert pt not in rtp.FORBIDDEN_PAYLOAD_TYPES
|
|
|
|
current_media = MediaDescription(
|
|
kind=kind, port=int(m.group(2)), profile=m.group(3), fmt=fmt_int or fmt
|
|
)
|
|
current_media.dtls = RTCDtlsParameters(
|
|
fingerprints=dtls_fingerprints[:], role=dtls_role
|
|
)
|
|
current_media.ice = RTCIceParameters(
|
|
iceLite=ice_lite,
|
|
usernameFragment=ice_usernameFragment,
|
|
password=ice_password,
|
|
)
|
|
current_media.ice_options = ice_options
|
|
session.media.append(current_media)
|
|
|
|
for line in media_lines[1:]:
|
|
if line.startswith("c="):
|
|
current_media.host = ipaddress_from_sdp(line[2:])
|
|
elif line.startswith("a="):
|
|
attr, value = parse_attr(line)
|
|
if attr == "candidate":
|
|
current_media.ice_candidates.append(candidate_from_sdp(value))
|
|
elif attr == "end-of-candidates":
|
|
current_media.ice_candidates_complete = True
|
|
elif attr == "extmap":
|
|
ext_id, ext_uri = value.split()
|
|
if "/" in ext_id:
|
|
ext_id, ext_direction = ext_id.split("/")
|
|
extension = RTCRtpHeaderExtensionParameters(
|
|
id=int(ext_id), uri=ext_uri
|
|
)
|
|
current_media.rtp.headerExtensions.append(extension)
|
|
elif attr == "fingerprint":
|
|
algorithm, fingerprint = value.split()
|
|
current_media.dtls.fingerprints.append(
|
|
RTCDtlsFingerprint(algorithm=algorithm, value=fingerprint)
|
|
)
|
|
elif attr == "ice-options":
|
|
current_media.ice_options = value
|
|
elif attr == "ice-pwd":
|
|
current_media.ice.password = value
|
|
elif attr == "ice-ufrag":
|
|
current_media.ice.usernameFragment = value
|
|
elif attr == "max-message-size":
|
|
current_media.sctpCapabilities = RTCSctpCapabilities(
|
|
maxMessageSize=int(value)
|
|
)
|
|
elif attr == "mid":
|
|
current_media.rtp.muxId = value
|
|
elif attr == "msid":
|
|
current_media.msid = value
|
|
elif attr == "rtcp":
|
|
port, rest = value.split(" ", 1)
|
|
current_media.rtcp_port = int(port)
|
|
current_media.rtcp_host = ipaddress_from_sdp(rest)
|
|
elif attr == "rtcp-mux":
|
|
current_media.rtcp_mux = True
|
|
elif attr == "setup":
|
|
current_media.dtls.role = DTLS_SETUP_ROLE[value]
|
|
elif attr in DIRECTIONS:
|
|
current_media.direction = attr
|
|
elif attr == "rtpmap":
|
|
format_id, format_desc = value.split(" ", 1)
|
|
bits = format_desc.split("/")
|
|
if current_media.kind == "audio":
|
|
if len(bits) > 2:
|
|
channels = int(bits[2])
|
|
else:
|
|
channels = 1
|
|
else:
|
|
channels = None
|
|
codec = RTCRtpCodecParameters(
|
|
mimeType=current_media.kind + "/" + bits[0],
|
|
channels=channels,
|
|
clockRate=int(bits[1]),
|
|
payloadType=int(format_id),
|
|
)
|
|
current_media.rtp.codecs.append(codec)
|
|
elif attr == "sctpmap":
|
|
format_id, format_desc = value.split(" ", 1)
|
|
getattr(current_media, attr)[int(format_id)] = format_desc
|
|
elif attr == "sctp-port":
|
|
current_media.sctp_port = int(value)
|
|
elif attr == "ssrc-group":
|
|
parse_group(current_media.ssrc_group, value, type=int)
|
|
elif attr == "ssrc":
|
|
ssrc_str, ssrc_desc = value.split(" ", 1)
|
|
ssrc = int(ssrc_str)
|
|
ssrc_attr, ssrc_value = ssrc_desc.split(":", 1)
|
|
|
|
try:
|
|
ssrc_info = next(
|
|
(x for x in current_media.ssrc if x.ssrc == ssrc)
|
|
)
|
|
except StopIteration:
|
|
ssrc_info = SsrcDescription(ssrc=ssrc)
|
|
current_media.ssrc.append(ssrc_info)
|
|
if ssrc_attr in SSRC_INFO_ATTRS:
|
|
setattr(ssrc_info, ssrc_attr, ssrc_value)
|
|
|
|
if current_media.dtls.role is None:
|
|
current_media.dtls = None
|
|
|
|
|
|
for line in media_lines[1:]:
|
|
if line.startswith("a="):
|
|
attr, value = parse_attr(line)
|
|
if attr == "fmtp":
|
|
format_id, format_desc = value.split(" ", 1)
|
|
codec = find_codec(int(format_id))
|
|
codec.parameters = parameters_from_sdp(format_desc)
|
|
elif attr == "rtcp-fb":
|
|
bits = value.split(" ", 2)
|
|
for codec in current_media.rtp.codecs:
|
|
if bits[0] in ["*", str(codec.payloadType)]:
|
|
codec.rtcpFeedback.append(
|
|
RTCRtcpFeedback(
|
|
type=bits[1],
|
|
parameter=bits[2] if len(bits) > 2 else None,
|
|
)
|
|
)
|
|
|
|
return session
|
|
|
|
def webrtc_track_id(self, media: MediaDescription) -> Optional[str]:
|
|
assert media in self.media
|
|
if media.msid is not None and " " in media.msid:
|
|
bits = media.msid.split()
|
|
for group in self.msid_semantic:
|
|
if group.semantic == "WMS" and (
|
|
bits[0] in group.items or "*" in group.items
|
|
):
|
|
return bits[1]
|
|
return None
|
|
|
|
def __str__(self) -> str:
|
|
lines = [f"v={self.version}", f"o={self.origin}", f"s={self.name}"]
|
|
if self.host is not None:
|
|
lines += [f"c={ipaddress_to_sdp(self.host)}"]
|
|
lines += [f"t={self.time}"]
|
|
if any(m.ice.iceLite for m in self.media):
|
|
lines += ["a=ice-lite"]
|
|
for group in self.group:
|
|
lines += [f"a=group:{group}"]
|
|
for group in self.msid_semantic:
|
|
lines += [f"a=msid-semantic:{group}"]
|
|
return "\r\n".join(lines) + "\r\n" + "".join([str(m) for m in self.media])
|
|
|