|
import asyncio
|
|
import fractions
|
|
import time
|
|
import uuid
|
|
from abc import ABCMeta, abstractmethod
|
|
from typing import Tuple, Union
|
|
|
|
from av import AudioFrame, VideoFrame
|
|
from av.frame import Frame
|
|
from av.packet import Packet
|
|
from pyee.asyncio import AsyncIOEventEmitter
|
|
|
|
AUDIO_PTIME = 0.020
|
|
VIDEO_CLOCK_RATE = 90000
|
|
VIDEO_PTIME = 1 / 30
|
|
VIDEO_TIME_BASE = fractions.Fraction(1, VIDEO_CLOCK_RATE)
|
|
|
|
|
|
def convert_timebase(
|
|
pts: int, from_base: fractions.Fraction, to_base: fractions.Fraction
|
|
) -> int:
|
|
if from_base != to_base:
|
|
pts = int(pts * from_base / to_base)
|
|
return pts
|
|
|
|
|
|
class MediaStreamError(Exception):
|
|
pass
|
|
|
|
|
|
class MediaStreamTrack(AsyncIOEventEmitter, metaclass=ABCMeta):
|
|
"""
|
|
A single media track within a stream.
|
|
"""
|
|
|
|
kind = "unknown"
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.__ended = False
|
|
self._id = str(uuid.uuid4())
|
|
|
|
@property
|
|
def id(self) -> str:
|
|
"""
|
|
An automatically generated globally unique ID.
|
|
"""
|
|
return self._id
|
|
|
|
@property
|
|
def readyState(self) -> str:
|
|
return "ended" if self.__ended else "live"
|
|
|
|
@abstractmethod
|
|
async def recv(self) -> Union[Frame, Packet]:
|
|
"""
|
|
Receive the next :class:`~av.audio.frame.AudioFrame`,
|
|
:class:`~av.video.frame.VideoFrame` or :class:`~av.packet.Packet`
|
|
"""
|
|
|
|
def stop(self) -> None:
|
|
if not self.__ended:
|
|
self.__ended = True
|
|
self.emit("ended")
|
|
|
|
|
|
|
|
self.remove_all_listeners()
|
|
|
|
|
|
class AudioStreamTrack(MediaStreamTrack):
|
|
"""
|
|
A dummy audio track which reads silence.
|
|
"""
|
|
|
|
kind = "audio"
|
|
|
|
_start: float
|
|
_timestamp: int
|
|
|
|
async def recv(self) -> Frame:
|
|
"""
|
|
Receive the next :class:`~av.audio.frame.AudioFrame`.
|
|
|
|
The base implementation just reads silence, subclass
|
|
:class:`AudioStreamTrack` to provide a useful implementation.
|
|
"""
|
|
if self.readyState != "live":
|
|
raise MediaStreamError
|
|
|
|
sample_rate = 8000
|
|
samples = int(AUDIO_PTIME * sample_rate)
|
|
|
|
if hasattr(self, "_timestamp"):
|
|
self._timestamp += samples
|
|
wait = self._start + (self._timestamp / sample_rate) - time.time()
|
|
await asyncio.sleep(wait)
|
|
else:
|
|
self._start = time.time()
|
|
self._timestamp = 0
|
|
|
|
frame = AudioFrame(format="s16", layout="mono", samples=samples)
|
|
for p in frame.planes:
|
|
p.update(bytes(p.buffer_size))
|
|
frame.pts = self._timestamp
|
|
frame.sample_rate = sample_rate
|
|
frame.time_base = fractions.Fraction(1, sample_rate)
|
|
return frame
|
|
|
|
|
|
class VideoStreamTrack(MediaStreamTrack):
|
|
"""
|
|
A dummy video track which reads green frames.
|
|
"""
|
|
|
|
kind = "video"
|
|
|
|
_start: float
|
|
_timestamp: int
|
|
|
|
async def next_timestamp(self) -> Tuple[int, fractions.Fraction]:
|
|
if self.readyState != "live":
|
|
raise MediaStreamError
|
|
|
|
if hasattr(self, "_timestamp"):
|
|
self._timestamp += int(VIDEO_PTIME * VIDEO_CLOCK_RATE)
|
|
wait = self._start + (self._timestamp / VIDEO_CLOCK_RATE) - time.time()
|
|
await asyncio.sleep(wait)
|
|
else:
|
|
self._start = time.time()
|
|
self._timestamp = 0
|
|
return self._timestamp, VIDEO_TIME_BASE
|
|
|
|
async def recv(self) -> Frame:
|
|
"""
|
|
Receive the next :class:`~av.video.frame.VideoFrame`.
|
|
|
|
The base implementation just reads a 640x480 green frame at 30fps,
|
|
subclass :class:`VideoStreamTrack` to provide a useful implementation.
|
|
"""
|
|
pts, time_base = await self.next_timestamp()
|
|
|
|
frame = VideoFrame(width=640, height=480)
|
|
for p in frame.planes:
|
|
p.update(bytes(p.buffer_size))
|
|
frame.pts = pts
|
|
frame.time_base = time_base
|
|
return frame
|
|
|