Files
rewind/rewind/core.py
Dylan De Faoite d6867c0df3 fix: incorrect progress bar tracking
Hacky workaround that involves working with FFMPEG negative timestamping
2026-02-02 23:58:53 +00:00

231 lines
7.1 KiB
Python

#!/usr/bin/env python3
import os
import datetime
import subprocess
import json
from rewind.state import load_state
from rewind.paths import load_config
from tqdm import tqdm
def clip(seconds_from_end: float) -> None:
clip_output = os.path.expanduser(load_config()["record"]["clip_output"])
os.makedirs(clip_output, exist_ok=True)
output_file_name = f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}.mp4"
output_path = os.path.join(clip_output, output_file_name)
start_timestamp = datetime.datetime.now().timestamp() - seconds_from_end
end_timestamp = datetime.datetime.now().timestamp()
length = end_timestamp - start_timestamp
files, start_offset, end_offset = _get_ts_files(
start_timestamp,
end_timestamp
)
_concat_ts_files(files, start_offset, end_offset, length, output_path)
print(f"Created clip: {output_path}")
def save(first_marker: str, second_marker: str):
vod_dir = os.path.expanduser(load_config()["record"]["vod_output"])
os.makedirs(vod_dir, exist_ok=True)
first_timestamp = get_marker_timestamp(first_marker)
second_timestamp = get_marker_timestamp(second_marker)
output_file_name = f"{datetime.datetime.fromtimestamp(first_timestamp).strftime('%Y-%m-%d_%H:%M:%S')}-[{first_marker}-{second_marker}].mp4"
output_path = os.path.join(vod_dir, output_file_name)
if first_timestamp >= second_timestamp:
raise ValueError("First marker must be before second marker")
files, start_offset, end_offset = _get_ts_files(
first_timestamp,
second_timestamp
)
_concat_ts_files(files, start_offset, end_offset, second_timestamp - first_timestamp, output_path)
print(f"Created video file: {output_path}")
def mark(name: str) -> None:
if not name:
raise ValueError("Marker name cannot be empty")
if marker_exists(name):
raise ValueError("Marker name already exists")
# writes marker to json file (not state)
markers_file = os.path.join(os.path.dirname(__file__), "markers.json")
if os.path.exists(markers_file):
with open(markers_file, "r") as f:
markers = json.load(f)
else:
markers = []
markers.append({
"name": name,
"timestamp": datetime.datetime.now().timestamp()
})
with open(markers_file, "w") as f:
json.dump(markers, f, indent=4)
print(f"Added marker: {name}")
def get_marker_timestamp(name: str) -> float:
markers_file = os.path.join(os.path.dirname(__file__), "markers.json")
if not os.path.exists(markers_file):
raise RuntimeError("No markers found")
with open(markers_file, "r") as f:
markers = json.load(f)
for marker in markers:
if marker["name"] == name:
return marker["timestamp"]
raise ValueError("Marker name does not exist")
def print_markers() -> None:
markers_file = os.path.join(os.path.dirname(__file__), "markers.json")
if not os.path.exists(markers_file):
print("No markers found.")
return
with open(markers_file, "r") as f:
markers = json.load(f)
for marker in markers:
format_time = datetime.datetime.fromtimestamp(marker['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
print(f"{format_time} -> {marker['name']}")
def remove_marker(name: str) -> None:
markers_file = os.path.join(os.path.dirname(__file__), "markers.json")
if not os.path.exists(markers_file):
raise RuntimeError("No markers found")
with open(markers_file, "r") as f:
markers = json.load(f)
markers = [m for m in markers if m["name"] != name]
with open(markers_file, "w") as f:
json.dump(markers, f, indent=4)
print(f"Removed marker: {name}")
def marker_exists(name: str) -> bool:
markers_file = os.path.join(os.path.dirname(__file__), "markers.json")
if not os.path.exists(markers_file):
return False
with open(markers_file, "r") as f:
markers = json.load(f)
for marker in markers:
if marker["name"] == name:
return True
return False
"""
Retrieves .ts files recorded between the specified timestamps.
Returns a list of file paths and extra start and end offsets if needed.
get_duration() is used as little as possible since it is slow.
end_timestamp of a file is the start time of the next file.
"""
def _get_ts_files(start_timestamp: float, end_timestamp: float) -> tuple[list[str], float, float]:
ts_files = load_state()["files"]
selected_files = []
start_offset = 0.0
end_offset = 0.0
for i, file_info in enumerate(ts_files):
file_start = file_info["timestamp"]
file_end = ts_files[i + 1]["timestamp"] if i + 1 < len(ts_files) else get_duration(file_info["path"]) + file_start
if file_end <= start_timestamp:
continue
if file_start >= end_timestamp:
break
selected_files.append(file_info["path"])
if file_start <= start_timestamp < file_end:
start_offset = start_timestamp - file_start
if file_start < end_timestamp <= file_end:
end_offset = file_end - end_timestamp
return selected_files, start_offset, end_offset
def _concat_ts_files(file_list: list[str], start_offset: float, end_offset: float, length: float, output_file: str) -> None:
with open("file_list.txt", "w") as f:
for file_path in file_list:
f.write(f"file '{file_path}'\n")
cmd = ["ffmpeg", "-y", "-hide_banner", "-loglevel", "error", "-nostats", "-progress", "pipe:1"]
if start_offset > 0:
cmd += ["-ss", str(start_offset)]
if end_offset > 0:
cmd += ["-t", str(length)]
cmd += ["-f", "concat", "-safe", "0", "-i", "file_list.txt", "-c", "copy"]
cmd.append(output_file)
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
init_ms_val = 0
init_ms_val_set = False
with tqdm(
total=length,
unit="s",
unit_scale=True,
unit_divisor=60,
desc="Processing",
leave=True,
) as pbar:
for line in process.stdout:
line = line.strip()
if line.startswith("out_time_ms="):
out_time_ms = int(line.split("=")[1])
if not init_ms_val_set:
init_ms_val = out_time_ms
init_ms_val_set = True
out_time_ms -= init_ms_val
seconds = abs(out_time_ms / 1_000_000)
pbar.n = min(seconds, length)
pbar.refresh()
elif line == "progress=end":
break
ret = process.wait()
os.remove("file_list.txt")
if ret != 0:
raise RuntimeError("ffmpeg failed")
def get_duration(file_path: str) -> float:
result = subprocess.run(
["ffprobe", "-v", "error", "-show_entries",
"format=duration", "-of",
"default=noprint_wrappers=1:nokey=1", file_path],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
if result.returncode != 0:
raise RuntimeError(f"ffprobe failed for file {file_path}")
return float(result.stdout)