krishna-k's picture
Upload folder using huggingface_hub
06555b5 verified
import asyncio
import fractions
import time
import uuid
from abc import ABCMeta, abstractmethod
from typing import Tuple, Union
from av import AudioFrame, VideoFrame
from av.frame import Frame
from av.packet import Packet
from pyee.asyncio import AsyncIOEventEmitter
AUDIO_PTIME = 0.020 # 20ms audio packetization
VIDEO_CLOCK_RATE = 90000
VIDEO_PTIME = 1 / 30 # 30fps
VIDEO_TIME_BASE = fractions.Fraction(1, VIDEO_CLOCK_RATE)
def convert_timebase(
pts: int, from_base: fractions.Fraction, to_base: fractions.Fraction
) -> int:
if from_base != to_base:
pts = int(pts * from_base / to_base)
return pts
class MediaStreamError(Exception):
pass
class MediaStreamTrack(AsyncIOEventEmitter, metaclass=ABCMeta):
"""
A single media track within a stream.
"""
kind = "unknown"
def __init__(self) -> None:
super().__init__()
self.__ended = False
self._id = str(uuid.uuid4())
@property
def id(self) -> str:
"""
An automatically generated globally unique ID.
"""
return self._id
@property
def readyState(self) -> str:
return "ended" if self.__ended else "live"
@abstractmethod
async def recv(self) -> Union[Frame, Packet]:
"""
Receive the next :class:`~av.audio.frame.AudioFrame`,
:class:`~av.video.frame.VideoFrame` or :class:`~av.packet.Packet`
"""
def stop(self) -> None:
if not self.__ended:
self.__ended = True
self.emit("ended")
# no more events will be emitted, so remove all event listeners
# to facilitate garbage collection.
self.remove_all_listeners()
class AudioStreamTrack(MediaStreamTrack):
"""
A dummy audio track which reads silence.
"""
kind = "audio"
_start: float
_timestamp: int
async def recv(self) -> Frame:
"""
Receive the next :class:`~av.audio.frame.AudioFrame`.
The base implementation just reads silence, subclass
:class:`AudioStreamTrack` to provide a useful implementation.
"""
if self.readyState != "live":
raise MediaStreamError
sample_rate = 8000
samples = int(AUDIO_PTIME * sample_rate)
if hasattr(self, "_timestamp"):
self._timestamp += samples
wait = self._start + (self._timestamp / sample_rate) - time.time()
await asyncio.sleep(wait)
else:
self._start = time.time()
self._timestamp = 0
frame = AudioFrame(format="s16", layout="mono", samples=samples)
for p in frame.planes:
p.update(bytes(p.buffer_size))
frame.pts = self._timestamp
frame.sample_rate = sample_rate
frame.time_base = fractions.Fraction(1, sample_rate)
return frame
class VideoStreamTrack(MediaStreamTrack):
"""
A dummy video track which reads green frames.
"""
kind = "video"
_start: float
_timestamp: int
async def next_timestamp(self) -> Tuple[int, fractions.Fraction]:
if self.readyState != "live":
raise MediaStreamError
if hasattr(self, "_timestamp"):
self._timestamp += int(VIDEO_PTIME * VIDEO_CLOCK_RATE)
wait = self._start + (self._timestamp / VIDEO_CLOCK_RATE) - time.time()
await asyncio.sleep(wait)
else:
self._start = time.time()
self._timestamp = 0
return self._timestamp, VIDEO_TIME_BASE
async def recv(self) -> Frame:
"""
Receive the next :class:`~av.video.frame.VideoFrame`.
The base implementation just reads a 640x480 green frame at 30fps,
subclass :class:`VideoStreamTrack` to provide a useful implementation.
"""
pts, time_base = await self.next_timestamp()
frame = VideoFrame(width=640, height=480)
for p in frame.planes:
p.update(bytes(p.buffer_size))
frame.pts = pts
frame.time_base = time_base
return frame