jellyfin-mpv-shim/jellyfin_mpv_shim/syncplay.py

644 lines
22 KiB
Python
Raw Normal View History

2020-08-08 03:56:48 +00:00
import logging
import threading
import os
from datetime import datetime, timedelta
from .clients import clientManager
2020-08-08 03:56:48 +00:00
from .media import Media
2020-08-10 07:33:08 +00:00
from .i18n import _
2020-08-08 03:56:48 +00:00
# This is based on: https://github.com/jellyfin/jellyfin-web/blob/master/src/components/syncPlay/syncPlayManager.js
from .conf import settings
2020-08-22 20:13:35 +00:00
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from .player import PlayerManager as PlayerManager_type
2020-08-08 03:56:48 +00:00
2020-08-22 15:38:07 +00:00
log = logging.getLogger("syncplay")
2020-08-08 03:56:48 +00:00
seconds_in_ticks = 10000000
info_commands = {
2020-08-10 07:33:08 +00:00
"GroupDoesNotExist": _("The specified SyncPlay group does not exist."),
"CreateGroupDenied": _("Creating SyncPlay groups is not allowed."),
"JoinGroupDenied": _("SyncPlay group access was denied."),
2020-08-22 15:38:07 +00:00
"LibraryAccessDenied": _("Access to the SyncPlay library was denied."),
2020-08-08 03:56:48 +00:00
}
2020-08-22 20:13:35 +00:00
def _parse_precise_time(time: str):
2020-08-08 03:56:48 +00:00
# We have to remove the Z and the least significant digit.
return datetime.strptime(time[:-2], "%Y-%m-%dT%H:%M:%S.%f")
class TimeoutThread(threading.Thread):
2020-08-22 20:13:35 +00:00
def __init__(self, action, delay: float, args):
2020-08-08 03:56:48 +00:00
self.action = action
self.delay = delay
self.args = args
self.halt = threading.Event()
threading.Thread.__init__(self)
self.daemon = True
def run(self):
2020-08-22 15:38:07 +00:00
if not self.halt.wait(timeout=self.delay / 1000):
try:
self.action(*self.args)
except:
log.error("TimeoutThread crashed.", exc_info=True)
2020-08-08 03:56:48 +00:00
def stop(self):
self.halt.set()
self.join()
def set_timeout(ms: float, callback, *args):
"""Similar to setTimeout JS function."""
timeout = TimeoutThread(callback, ms, args)
timeout.start()
return timeout.stop
class SyncPlayManager:
2020-08-22 20:13:35 +00:00
def __init__(self, manager: "PlayerManager_type"):
2020-08-08 03:56:48 +00:00
self.playerManager = manager
self.menu = manager.menu
self.sync_enabled = False
self.playback_diff_ms = 0
self.method = None
self.attempts = 0
self.last_sync_time = datetime.utcnow()
self.enable_speed_sync = True
self.last_playback_waiting = None
self.min_buffer_thresh_ms = 1000
self.playback_rate = 1
self.enabled_at = None
self.ready = False
self.is_buffering = False
2020-08-08 03:56:48 +00:00
self.last_command = None
self.queued_command = None
self.scheduled_command = None
self.sync_timeout = None
self.time_offset = timedelta(0)
self.round_trip_duration = timedelta(0)
self.notify_sync_ready = False
self.read_callback = None
self.timesync = None
self.client = None
self.current_group = None
self.playqueue_last_updated = None
2020-08-08 03:56:48 +00:00
# On playback time update (call from timeline push)
def sync_playback_time(self):
2020-08-22 15:38:07 +00:00
if (
not self.last_command
or self.last_command["Command"] != "Unpause"
or self.is_buffering
or not self.playerManager.is_not_paused()
2020-08-26 00:30:53 +00:00
or self.playerManager.menu.is_menu_shown
2020-08-22 15:38:07 +00:00
):
2020-08-08 03:56:48 +00:00
log.debug("Not syncing due to no playback.")
return
current_time = datetime.utcnow()
# Avoid overloading player
elapsed = current_time - self.last_sync_time
if elapsed.total_seconds() * 1000 < settings.sync_method_thresh / 2:
log.debug("Not syncing due to threshold.")
return
self.last_sync_time = current_time
play_at_time = self.last_command["When"]
2020-08-22 19:00:24 +00:00
current_position_ticks = int(self.playerManager.get_time() * seconds_in_ticks)
2020-08-22 15:38:07 +00:00
server_position_ticks = (
self.last_command["PositionTicks"]
+ ((current_time - play_at_time) + self.time_offset).total_seconds()
* seconds_in_ticks
)
2020-08-08 03:56:48 +00:00
diff_ms = (server_position_ticks - current_position_ticks) / 10000
self.playback_diff_ms = diff_ms
if self.sync_enabled:
abs_diff_ms = abs(diff_ms)
2020-08-22 15:38:07 +00:00
if (
self.enable_speed_sync
and settings.sync_max_delay_speed
< abs_diff_ms
< settings.sync_method_thresh
):
2020-08-08 03:56:48 +00:00
if self.attempts > settings.sync_speed_attempts:
self.enable_speed_sync = False
return
# Speed To Sync Method
speed = 1 + diff_ms / settings.sync_speed_time
2020-08-22 19:00:24 +00:00
self.playerManager.set_speed(speed)
2020-08-08 03:56:48 +00:00
self.sync_enabled = False
self.attempts += 1
log.info("SyncPlay Speed to Sync rate: {0}".format(speed))
2020-08-10 07:33:08 +00:00
self.player_message(_("SpeedToSync (x{0})").format(speed))
2020-08-08 03:56:48 +00:00
def callback():
2020-08-22 19:00:24 +00:00
self.playerManager.set_speed(1)
2020-08-08 03:56:48 +00:00
self.sync_enabled = True
2020-08-22 15:38:07 +00:00
2020-08-08 03:56:48 +00:00
set_timeout(settings.sync_speed_time, callback)
elif abs_diff_ms > settings.sync_max_delay_skip:
if self.attempts > settings.sync_attempts:
self.sync_enabled = False
log.info("SyncPlay Sync Disabled due to too many attempts.")
2020-08-10 07:33:08 +00:00
self.player_message(_("Sync Disabled (Too Many Attempts)"))
2020-08-08 03:56:48 +00:00
return
# Skip To Sync Method
self.local_seek(server_position_ticks / seconds_in_ticks)
self.sync_enabled = False
self.attempts += 1
log.info("SyncPlay Skip to Sync Activated")
2020-08-10 07:33:08 +00:00
self.player_message(_("SkipToSync (x{0})").format(self.attempts))
2020-08-08 03:56:48 +00:00
def callback():
self.sync_enabled = True
2020-08-22 15:38:07 +00:00
2020-08-08 03:56:48 +00:00
set_timeout(settings.sync_method_thresh / 2, callback)
else:
if self.attempts > 0:
2020-08-22 15:38:07 +00:00
log.info(
"Playback synced after {0} attempts.".format(self.attempts)
)
2020-08-08 03:56:48 +00:00
self.attempts = 0
# On timesync update
2020-08-22 20:13:35 +00:00
def on_timesync_update(self, time_offset: timedelta, ping: timedelta):
2020-08-08 03:56:48 +00:00
self.time_offset = time_offset
self.round_trip_duration = ping * 2
if self.notify_sync_ready:
self.ready = True
self.notify_sync_ready = False
if self.read_callback:
self.read_callback()
self.read_callback = None
# Server responds with 400 bad request...
2020-08-08 03:56:48 +00:00
if self.sync_enabled:
try:
self.client.jellyfin.ping_sync_play(ping.total_seconds() * 1000)
except Exception:
log.error("Syncplay ping reporting failed.")
2020-08-08 03:56:48 +00:00
def enable_sync_play(self, from_server: bool):
2020-08-22 19:00:24 +00:00
self.playback_rate = self.playerManager.get_speed()
self.enabled_at = datetime.utcnow()
2020-08-08 03:56:48 +00:00
self.enable_speed_sync = True
def ready_callback():
self.process_command(self.queued_command)
self.queued_command = None
self.enabled_at = self.timesync.local_date_to_server(self.enabled_at)
2020-08-22 15:38:07 +00:00
2020-08-08 03:56:48 +00:00
self.read_callback = ready_callback
self.ready = False
self.notify_sync_ready = True
if not from_server:
self.client = self.playerManager.get_current_client()
2020-08-08 03:56:48 +00:00
timesync = self.client.timesync
if self.timesync is not None and timesync is not self.timesync:
self.timesync.remove_subscriber(self.on_timesync_update)
self.timesync.stop_ping()
self.timesync = timesync
self.timesync.subscribe_time_offset(self.on_timesync_update)
self.timesync.force_update()
log.info("Syncplay enabled.")
if from_server:
2020-08-10 07:33:08 +00:00
self.player_message(_("SyncPlay enabled."))
2020-08-08 03:56:48 +00:00
2020-08-22 20:13:35 +00:00
def disable_sync_play(self, from_server: bool):
2020-08-22 19:00:24 +00:00
self.playerManager.set_speed(self.playback_rate)
2020-08-08 03:56:48 +00:00
self.enabled_at = None
self.ready = False
self.last_command = None
self.queued_command = None
self.sync_enabled = False
self.playqueue_last_updated = None
2020-08-08 03:56:48 +00:00
if self.timesync is not None:
self.timesync.remove_subscriber(self.on_timesync_update)
self.timesync.stop_ping()
self.timesync = None
self.current_group = None
log.info("Syncplay disabled.")
if from_server:
2020-08-10 07:33:08 +00:00
self.player_message(_("SyncPlay disabled."))
else:
try:
self.client.jellyfin.leave_sync_play()
except:
log.warning("Failed to leave syncplay.", exc_info=True)
def _buffer_req(self, is_buffering):
if self.timesync is None:
# This can get called before it is ready...
return
media = self.playerManager.get_video().parent
when = self.timesync.local_date_to_server(datetime.utcnow())
ticks = int(self.playerManager.get_time() * seconds_in_ticks)
playing = self.playerManager.is_not_paused()
playlist_id = media.queue[media.seq]["PlaylistItemId"]
if is_buffering:
self.client.jellyfin.buffering_sync_play(when, ticks, playing, playlist_id)
else:
self.client.jellyfin.ready_sync_play(when, ticks, playing, playlist_id)
2020-08-08 03:56:48 +00:00
# On Buffer
def on_buffer(self):
if not self.last_playback_waiting:
playback_waiting = datetime.utcnow()
self.last_playback_waiting = playback_waiting
def handle_buffer():
if playback_waiting == self.last_playback_waiting:
self._buffer_req(True)
self.is_buffering = True
set_timeout(self.min_buffer_thresh_ms, handle_buffer)
2020-08-08 03:56:48 +00:00
# On Buffer Done
def on_buffer_done(self):
if self.is_buffering:
self._buffer_req(False)
2020-08-08 03:56:48 +00:00
self.last_playback_waiting = None
self.is_buffering = False
2020-08-08 03:56:48 +00:00
def play_done(self):
self.local_pause()
self._buffer_req(False)
2020-08-08 03:56:48 +00:00
def is_enabled(self):
return self.enabled_at is not None
2020-08-22 20:13:35 +00:00
def process_group_update(self, command: dict):
2020-08-08 03:56:48 +00:00
command_type = command["Type"]
log.debug("Syncplay group update: {0}".format(command))
if command_type in info_commands:
self.player_message(info_commands[command_type])
elif command_type == "PrepareSession":
self.prepare_session(command["GroupId"], command["Data"])
elif command_type == "GroupJoined":
self.current_group = command["GroupId"]
self.enable_sync_play(True)
2020-08-08 03:56:48 +00:00
elif command_type == "GroupLeft" or command_type == "NotInGroup":
self.disable_sync_play(True)
elif command_type == "UserJoined":
2020-08-10 07:33:08 +00:00
self.player_message(_("{0} has joined.").format(command["Data"]))
2020-08-08 03:56:48 +00:00
elif command_type == "UserLeft":
2020-08-10 07:33:08 +00:00
self.player_message(_("{0} has left.").format(command["Data"]))
2020-08-08 03:56:48 +00:00
elif command_type == "GroupWait":
2020-08-10 07:33:08 +00:00
self.player_message(_("{0} is buffering.").format(command["Data"]))
elif command_type == "PlayQueue":
self.upd_queue(command["Data"])
elif command_type == "StateUpdate":
log.info(
"{0} state caused by {1}".format(
command["Data"]["State"], command["Data"]["Reason"]
)
)
2020-08-08 03:56:48 +00:00
else:
2020-08-22 15:38:07 +00:00
log.error(
"Unknown SyncPlay command {0} payload {1}.".format(
command_type, command
)
)
2020-08-08 03:56:48 +00:00
2020-08-22 20:13:35 +00:00
def process_command(self, command: Optional[dict]):
2020-08-08 03:56:48 +00:00
if command is None:
return
if not self.is_enabled():
2020-08-22 15:38:07 +00:00
log.debug(
"Ignoring command {0} due to SyncPlay being disabled.".format(command)
)
2020-08-08 03:56:48 +00:00
return
if not self.ready:
2020-08-22 15:38:07 +00:00
log.debug(
"Queued command {0} due to SyncPlay not being ready.".format(command)
)
2020-08-08 03:56:48 +00:00
self.queued_command = command
return
command["When"] = _parse_precise_time(command["When"])
command["EmitttedAt"] = _parse_precise_time(command["EmittedAt"])
if command["EmitttedAt"] < self.enabled_at:
log.debug("Ignoring old command {0}.".format(command))
return
2020-08-22 15:38:07 +00:00
if (
self.last_command
and self.last_command["When"] == command["When"]
and self.last_command["PositionTicks"] == command["PositionTicks"]
and self.last_command["Command"] == command["Command"]
):
2020-08-08 03:56:48 +00:00
log.debug("Ignoring duplicate command {0}.".format(command))
return
2020-08-08 03:56:48 +00:00
self.last_command = command
2020-08-22 15:38:07 +00:00
command_cmd, when, position = (
command["Command"],
command["When"],
command["PositionTicks"],
)
log.info(
"Syncplay will {0} at {1} position {2}".format(command_cmd, when, position)
)
2020-08-08 03:56:48 +00:00
if command_cmd == "Unpause":
2020-08-08 03:56:48 +00:00
self.schedule_play(when, position)
elif command_cmd == "Pause":
self.schedule_pause(when, position)
elif command_cmd == "Seek":
self.schedule_seek(when, position)
else:
log.error("Command {0} is unknown.".format(command_cmd))
2020-08-22 20:13:35 +00:00
def prepare_session(self, group_id: str, session_data: dict):
# I think this might be a dead code path
2020-08-22 15:38:07 +00:00
play_command = session_data.get("PlayCommand")
2020-08-22 19:00:24 +00:00
if not self.playerManager.has_video():
2020-08-08 03:56:48 +00:00
play_command = "PlayNow"
seq = session_data.get("StartIndex")
if seq is None:
seq = 0
2020-08-22 15:38:07 +00:00
media = Media(
self.client,
session_data.get("ItemIds"),
seq=seq,
user_id=session_data.get("ControllingUserId"),
aid=session_data.get("AudioStreamIndex"),
sid=session_data.get("SubtitleStreamIndex"),
srcid=session_data.get("MediaSourceId"),
)
if (
2020-08-22 19:00:24 +00:00
self.playerManager.has_video()
and self.playerManager.get_video().item_id == session_data["ItemIds"][0]
2020-08-22 15:38:07 +00:00
and play_command == "PlayNow"
):
2020-08-08 03:56:48 +00:00
# We assume the video is already available.
2020-08-22 19:00:24 +00:00
self.playerManager.get_video().parent = media
2020-08-08 03:56:48 +00:00
log.info("Syncplay Session Prepare: {0} {1}".format(group_id, session_data))
2020-08-22 15:38:07 +00:00
self.local_seek(
(session_data.get("PositionTicks", 0) or 0) / seconds_in_ticks
)
2020-08-08 03:56:48 +00:00
self.current_group = group_id
elif play_command == "PlayNow":
2020-08-22 15:38:07 +00:00
log.info(
"Syncplay Session Recreate: {0} {1}".format(group_id, session_data)
)
offset = session_data.get("StartPositionTicks")
2020-08-08 03:56:48 +00:00
if offset is not None:
offset /= 10000000
video = media.video
if video:
if settings.pre_media_cmd:
os.system(settings.pre_media_cmd)
2023-02-15 02:33:58 +00:00
self.playerManager.play(
video, offset, no_initial_timeline=True, is_initial_play=True
)
2020-08-08 03:56:48 +00:00
self.playerManager.send_timeline()
# Really not sure why I have to call this.
self.join_group(group_id)
self.playerManager.timeline_handle()
if settings.play_cmd:
os.system(settings.play_cmd)
2020-08-08 03:56:48 +00:00
elif play_command == "PlayLast":
2020-08-22 19:00:24 +00:00
self.playerManager.get_video().parent.insert_items(
2020-08-22 15:38:07 +00:00
session_data.get("ItemIds"), append=True
)
2020-08-08 03:56:48 +00:00
self.playerManager.upd_player_hide()
elif play_command == "PlayNext":
2020-08-22 19:00:24 +00:00
self.playerManager.get_video().parent.insert_items(
2020-08-22 15:38:07 +00:00
session_data.get("ItemIds"), append=False
)
2020-08-08 03:56:48 +00:00
self.playerManager.upd_player_hide()
2020-08-22 20:13:35 +00:00
def player_message(self, message: str):
2020-08-08 03:56:48 +00:00
# Messages overwrite menu, so they are ignored.
if not self.menu.is_menu_shown:
if settings.sync_osd_message:
2020-08-22 20:13:35 +00:00
self.playerManager.show_text(message, 2000)
2020-08-08 03:56:48 +00:00
else:
log.info("SyncPlay Message: {0}".format(message))
else:
log.info("Ignored SyncPlay Message (menu): {0}".format(message))
2020-08-22 20:13:35 +00:00
def schedule_play(self, when: datetime, position: int):
2020-08-08 03:56:48 +00:00
self.clear_scheduled_command()
current_time = datetime.utcnow()
local_play_time = self.timesync.server_date_to_local(when)
if local_play_time > current_time:
log.debug("SyncPlay Scheduled Play: Playing Later")
play_timeout = (local_play_time - current_time).total_seconds() * 1000
self.local_seek(position / seconds_in_ticks)
def scheduled():
self.local_play()
def sync_timeout():
self.sync_enabled = True
2020-08-22 15:38:07 +00:00
self.sync_timeout = set_timeout(
settings.sync_method_thresh / 2, sync_timeout
)
2020-08-08 03:56:48 +00:00
self.scheduled_command = set_timeout(play_timeout, scheduled)
else:
log.debug("SyncPlay Scheduled Play: Playing Now")
# Group playback already started
2020-08-22 15:38:07 +00:00
server_position_secs = (
position / seconds_in_ticks
+ (current_time - local_play_time).total_seconds()
)
2020-08-08 03:56:48 +00:00
self.local_play()
self.local_seek(server_position_secs)
def sync_timeout():
self.sync_enabled = True
2020-08-22 15:38:07 +00:00
self.sync_timeout = set_timeout(
settings.sync_method_thresh / 2, sync_timeout
)
2020-08-08 03:56:48 +00:00
2020-08-22 20:13:35 +00:00
def schedule_pause(self, when: datetime, position: int, seek_only: bool = False):
2020-08-08 03:56:48 +00:00
self.clear_scheduled_command()
current_time = datetime.utcnow()
local_pause_time = self.timesync.server_date_to_local(when)
def callback():
if not seek_only:
self.local_pause()
self.local_seek(position / seconds_in_ticks)
if local_pause_time > current_time:
log.debug("SyncPlay Scheduled Pause/Seek: Pausing Later")
pause_timeout = (local_pause_time - current_time).total_seconds() * 1000
self.scheduled_command = set_timeout(pause_timeout, callback)
else:
log.debug("SyncPlay Scheduled Pause/Seek: Pausing Now")
callback()
def _play_video(self, video, offset):
if video:
if settings.pre_media_cmd:
os.system(settings.pre_media_cmd)
self.playerManager.play(video, offset, no_initial_timeline=True)
if settings.play_cmd:
os.system(settings.play_cmd)
else:
log.error("No video from queue update.")
def upd_queue(self, data):
# It can't hurt to update the queue lol.
# last_upd = _parse_precise_time(data["LastUpdate"])
# if (
# self.playqueue_last_updated is not None
# and self.playqueue_last_updated >= last_upd
# ):
# log.warning("Tried to apply old queue update.")
# return
#
# self.playqueue_last_updated = last_upd
sp_items = [
{"Id": x["ItemId"], "PlaylistItemId": x["PlaylistItemId"]}
for x in data["Playlist"]
]
if self.playerManager.get_video() is None:
media = Media(
self.client, sp_items, data["PlayingItemIndex"], queue_override=False
)
log.info("The queue update changed the video. (New)")
offset = data.get("StartPositionTicks")
if offset is not None:
offset /= 10000000
self._play_video(media.video, offset)
else:
media = self.playerManager.get_video().parent
new_media = media.replace_queue(sp_items, data["PlayingItemIndex"])
if new_media:
log.info("The queue update changed the video.")
offset = data.get("StartPositionTicks")
if offset is not None:
offset /= 10000000
self._play_video(new_media.video, offset)
else:
self._buffer_req(False)
2020-08-22 20:13:35 +00:00
def schedule_seek(self, when: datetime, position: int):
2020-08-08 03:56:48 +00:00
# This replicates what the web client does.
self.schedule_pause(when, position)
def clear_scheduled_command(self):
if self.scheduled_command is not None:
self.scheduled_command()
if self.sync_timeout is not None:
self.sync_timeout()
self.sync_enabled = False
2020-08-22 19:00:24 +00:00
self.playerManager.set_speed(1)
2020-08-08 03:56:48 +00:00
def play_request(self):
self.client.jellyfin.unpause_sync_play()
2020-08-08 03:56:48 +00:00
def pause_request(self):
self.client.jellyfin.pause_sync_play()
self.playerManager.set_paused(True, True)
def seek_request(self, offset: float):
self.client.jellyfin.seek_sync_play(int(offset * seconds_in_ticks))
def local_play(self):
self.playerManager.set_paused(False, True)
def local_pause(self):
self.playerManager.set_paused(True, True)
def local_seek(self, offset: float):
self.playerManager.seek(offset, absolute=True, force=True)
2020-08-22 20:13:35 +00:00
def join_group(self, group_id: str):
2020-08-08 03:56:48 +00:00
self.client.jellyfin.join_sync_play(group_id)
2021-11-28 18:34:22 +00:00
def discord_join_group(self, join_secret):
self.enable_sync_play(False)
self.client.jellyfin.join_sync_play(join_secret["secret"])
2020-08-08 03:56:48 +00:00
def menu_join_group(self):
group = self.menu.menu_list[self.menu.menu_selection][2]
self.menu.hide_menu()
self.client.jellyfin.join_sync_play(group["GroupId"])
def menu_disable(self):
self.menu.hide_menu()
self.client.jellyfin.leave_sync_play()
def menu_create_group(self):
self.menu.hide_menu()
self.client.jellyfin.new_sync_play_v2(
2021-03-18 16:15:19 +00:00
_("{0}'s Group").format(clientManager.get_username_from_client(self.client))
)
2020-08-08 03:56:48 +00:00
def menu_action(self):
2020-08-22 19:00:24 +00:00
self.client = self.playerManager.get_current_client()
2020-08-08 03:56:48 +00:00
selected = 0
offset = 1
group_option_list = [
2020-08-10 07:33:08 +00:00
(_("None (Disabled)"), self.menu_disable, None),
2020-08-08 03:56:48 +00:00
]
if not self.is_enabled():
offset = 2
2020-08-10 07:33:08 +00:00
group_option_list.append((_("New Group"), self.menu_create_group, None))
groups = self.client.jellyfin.get_sync_play()
2020-08-08 03:56:48 +00:00
for i, group in enumerate(groups):
group_option_list.append((group["GroupName"], self.menu_join_group, group))
2020-08-08 03:56:48 +00:00
if group["GroupId"] == self.current_group:
selected = i + offset
2020-08-10 07:33:08 +00:00
self.menu.put_menu(_("SyncPlay"), group_option_list, selected)
def request_next(self, playlist_item_id):
self.client.jellyfin.next_sync_play(playlist_item_id)
def request_prev(self, playlist_item_id):
self.client.jellyfin.prev_sync_play(playlist_item_id)
def request_skip(self, playlist_item_id):
self.client.jellyfin.set_item_sync_play(playlist_item_id)