rewind: make helpers private, clean up marker handling
Rename internal helpers to underscore-prefixed versions and move them closer to their actual call sites. These functions were never meant to be part of the public API. Clean up marker handling by separating concerns properly: printing markers no longer doubles as an existence check, and old markers are cleaned explicitly. Also drop stray debug logging when loading config. No functional change intended, just less confusion and tighter scope.
This commit is contained in:
129
rewind/core.py
129
rewind/core.py
@@ -7,52 +7,6 @@ import json
|
|||||||
from rewind.paths import load_config
|
from rewind.paths import load_config
|
||||||
from rewind.state import load_state
|
from rewind.state import load_state
|
||||||
|
|
||||||
"""
|
|
||||||
Retrieves .ts files recorded between the specified timestamps.
|
|
||||||
Returns a list of file paths and extra start and end offsets if needed.
|
|
||||||
get_duration() is used as little as possible since it is slow.
|
|
||||||
end_timestamp of a file is the start time of the next file.
|
|
||||||
"""
|
|
||||||
def get_ts_files(start_timestamp: float, end_timestamp: float) -> tuple[list[str], float, float]:
|
|
||||||
ts_files = load_state()["files"]
|
|
||||||
selected_files = []
|
|
||||||
start_offset = 0.0
|
|
||||||
end_offset = 0.0
|
|
||||||
|
|
||||||
for i, file_info in enumerate(ts_files):
|
|
||||||
file_start = file_info["timestamp"]
|
|
||||||
file_end = ts_files[i + 1]["timestamp"] if i + 1 < len(ts_files) else get_duration(file_info["path"]) + file_start
|
|
||||||
|
|
||||||
if file_end <= start_timestamp:
|
|
||||||
continue
|
|
||||||
if file_start >= end_timestamp:
|
|
||||||
break
|
|
||||||
|
|
||||||
selected_files.append(file_info["path"])
|
|
||||||
|
|
||||||
if file_start <= start_timestamp < file_end:
|
|
||||||
start_offset = start_timestamp - file_start
|
|
||||||
if file_start < end_timestamp <= file_end:
|
|
||||||
end_offset = file_end - end_timestamp
|
|
||||||
|
|
||||||
return selected_files, start_offset, end_offset
|
|
||||||
|
|
||||||
def concat_ts_files(file_list: list[str], start_offset: float, end_offset: float, length: float, output_file: str) -> None:
|
|
||||||
with open("file_list.txt", "w") as f:
|
|
||||||
for file_path in file_list:
|
|
||||||
f.write(f"file '{file_path}'\n")
|
|
||||||
|
|
||||||
cmd = ["ffmpeg", "-y"]
|
|
||||||
if start_offset > 0:
|
|
||||||
cmd += ["-ss", str(start_offset)]
|
|
||||||
if end_offset > 0:
|
|
||||||
cmd += ["-t", str(length)]
|
|
||||||
cmd += ["-f", "concat", "-safe", "0", "-i", "file_list.txt", "-c", "copy"]
|
|
||||||
cmd.append(output_file)
|
|
||||||
|
|
||||||
subprocess.run(cmd)
|
|
||||||
os.remove("file_list.txt")
|
|
||||||
|
|
||||||
def clip(seconds_from_end: float) -> None:
|
def clip(seconds_from_end: float) -> None:
|
||||||
output_file_name = f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}.mp4"
|
output_file_name = f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}.mp4"
|
||||||
|
|
||||||
@@ -60,12 +14,12 @@ def clip(seconds_from_end: float) -> None:
|
|||||||
end_timestamp = datetime.datetime.now().timestamp()
|
end_timestamp = datetime.datetime.now().timestamp()
|
||||||
length = end_timestamp - start_timestamp
|
length = end_timestamp - start_timestamp
|
||||||
|
|
||||||
files, start_offset, end_offset = get_ts_files(
|
files, start_offset, end_offset = _get_ts_files(
|
||||||
start_timestamp,
|
start_timestamp,
|
||||||
end_timestamp
|
end_timestamp
|
||||||
)
|
)
|
||||||
|
|
||||||
concat_ts_files(files, start_offset, end_offset, length, output_file_name)
|
_concat_ts_files(files, start_offset, end_offset, length, output_file_name)
|
||||||
print(f"Created clip: {output_file_name}")
|
print(f"Created clip: {output_file_name}")
|
||||||
|
|
||||||
def save(first_marker: str, second_marker: str):
|
def save(first_marker: str, second_marker: str):
|
||||||
@@ -76,12 +30,12 @@ def save(first_marker: str, second_marker: str):
|
|||||||
if first_timestamp >= second_timestamp:
|
if first_timestamp >= second_timestamp:
|
||||||
raise ValueError("First marker must be before second marker")
|
raise ValueError("First marker must be before second marker")
|
||||||
|
|
||||||
files, start_offset, end_offset = get_ts_files(
|
files, start_offset, end_offset = _get_ts_files(
|
||||||
first_timestamp,
|
first_timestamp,
|
||||||
second_timestamp
|
second_timestamp
|
||||||
)
|
)
|
||||||
|
|
||||||
concat_ts_files(files, start_offset, end_offset, second_timestamp - first_timestamp, output_file_name)
|
_concat_ts_files(files, start_offset, end_offset, second_timestamp - first_timestamp, output_file_name)
|
||||||
print(f"Created video file: {output_file_name}")
|
print(f"Created video file: {output_file_name}")
|
||||||
|
|
||||||
def mark(name: str) -> None:
|
def mark(name: str) -> None:
|
||||||
@@ -123,20 +77,21 @@ def get_marker_timestamp(name: str) -> float:
|
|||||||
|
|
||||||
raise ValueError("Marker name does not exist")
|
raise ValueError("Marker name does not exist")
|
||||||
|
|
||||||
def marker_exists(name: str) -> bool:
|
def print_markers() -> None:
|
||||||
|
_clean_old_markers(load_config()["record"]["max_record_time"])
|
||||||
|
|
||||||
markers_file = os.path.join(os.path.dirname(__file__), "markers.json")
|
markers_file = os.path.join(os.path.dirname(__file__), "markers.json")
|
||||||
if not os.path.exists(markers_file):
|
if not os.path.exists(markers_file):
|
||||||
return False
|
print("No markers found.")
|
||||||
|
return
|
||||||
|
|
||||||
with open(markers_file, "r") as f:
|
with open(markers_file, "r") as f:
|
||||||
markers = json.load(f)
|
markers = json.load(f)
|
||||||
|
|
||||||
for marker in markers:
|
for marker in markers:
|
||||||
if marker["name"] == name:
|
format_time = datetime.datetime.fromtimestamp(marker['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
|
||||||
return True
|
print(f"{format_time} -> {marker['name']}")
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
def remove_marker(name: str) -> None:
|
def remove_marker(name: str) -> None:
|
||||||
markers_file = os.path.join(os.path.dirname(__file__), "markers.json")
|
markers_file = os.path.join(os.path.dirname(__file__), "markers.json")
|
||||||
if not os.path.exists(markers_file):
|
if not os.path.exists(markers_file):
|
||||||
@@ -152,22 +107,68 @@ def remove_marker(name: str) -> None:
|
|||||||
|
|
||||||
print(f"Removed marker: {name}")
|
print(f"Removed marker: {name}")
|
||||||
|
|
||||||
def print_markers() -> None:
|
|
||||||
clean_old_markers(load_config()["record"]["max_record_time"])
|
|
||||||
|
|
||||||
|
def marker_exists(name: str) -> bool:
|
||||||
markers_file = os.path.join(os.path.dirname(__file__), "markers.json")
|
markers_file = os.path.join(os.path.dirname(__file__), "markers.json")
|
||||||
if not os.path.exists(markers_file):
|
if not os.path.exists(markers_file):
|
||||||
print("No markers found.")
|
return False
|
||||||
return
|
|
||||||
|
|
||||||
with open(markers_file, "r") as f:
|
with open(markers_file, "r") as f:
|
||||||
markers = json.load(f)
|
markers = json.load(f)
|
||||||
|
|
||||||
for marker in markers:
|
for marker in markers:
|
||||||
format_time = datetime.datetime.fromtimestamp(marker['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
|
if marker["name"] == name:
|
||||||
print(f"{format_time} -> {marker['name']}")
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
def clean_old_markers(max_age_seconds: float) -> None:
|
"""
|
||||||
|
Retrieves .ts files recorded between the specified timestamps.
|
||||||
|
Returns a list of file paths and extra start and end offsets if needed.
|
||||||
|
get_duration() is used as little as possible since it is slow.
|
||||||
|
end_timestamp of a file is the start time of the next file.
|
||||||
|
"""
|
||||||
|
def _get_ts_files(start_timestamp: float, end_timestamp: float) -> tuple[list[str], float, float]:
|
||||||
|
ts_files = load_state()["files"]
|
||||||
|
selected_files = []
|
||||||
|
start_offset = 0.0
|
||||||
|
end_offset = 0.0
|
||||||
|
|
||||||
|
for i, file_info in enumerate(ts_files):
|
||||||
|
file_start = file_info["timestamp"]
|
||||||
|
file_end = ts_files[i + 1]["timestamp"] if i + 1 < len(ts_files) else get_duration(file_info["path"]) + file_start
|
||||||
|
|
||||||
|
if file_end <= start_timestamp:
|
||||||
|
continue
|
||||||
|
if file_start >= end_timestamp:
|
||||||
|
break
|
||||||
|
|
||||||
|
selected_files.append(file_info["path"])
|
||||||
|
|
||||||
|
if file_start <= start_timestamp < file_end:
|
||||||
|
start_offset = start_timestamp - file_start
|
||||||
|
if file_start < end_timestamp <= file_end:
|
||||||
|
end_offset = file_end - end_timestamp
|
||||||
|
|
||||||
|
return selected_files, start_offset, end_offset
|
||||||
|
|
||||||
|
def _concat_ts_files(file_list: list[str], start_offset: float, end_offset: float, length: float, output_file: str) -> None:
|
||||||
|
with open("file_list.txt", "w") as f:
|
||||||
|
for file_path in file_list:
|
||||||
|
f.write(f"file '{file_path}'\n")
|
||||||
|
|
||||||
|
cmd = ["ffmpeg", "-y"]
|
||||||
|
if start_offset > 0:
|
||||||
|
cmd += ["-ss", str(start_offset)]
|
||||||
|
if end_offset > 0:
|
||||||
|
cmd += ["-t", str(length)]
|
||||||
|
cmd += ["-f", "concat", "-safe", "0", "-i", "file_list.txt", "-c", "copy"]
|
||||||
|
cmd.append(output_file)
|
||||||
|
|
||||||
|
subprocess.run(cmd)
|
||||||
|
os.remove("file_list.txt")
|
||||||
|
|
||||||
|
def _clean_old_markers(max_age_seconds: float) -> None:
|
||||||
markers_file = os.path.join(os.path.dirname(__file__), "markers.json")
|
markers_file = os.path.join(os.path.dirname(__file__), "markers.json")
|
||||||
if not os.path.exists(markers_file):
|
if not os.path.exists(markers_file):
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user