ghost-mirror/ghost_trigger.py

95 lines
No EOL
3.1 KiB
Python

import os
import yaml
import random
import subprocess
import time
from gpiozero import MotionSensor
from signal import pause
# --- Načtení konfigurace ---
CONFIG_PATH = os.path.join(os.path.dirname(__file__), "config/config.yaml")
config = {}
try:
with open(CONFIG_PATH, 'r') as f:
config = yaml.safe_load(f)
except Exception:
exit()
PIR_PIN = config.get('pir_pin', 17)
VIDEOS_DIR = os.path.join(os.path.dirname(__file__), config.get('videos_dir', 'videos'))
BLACK_IMAGE_PATH = os.path.join(os.path.dirname(__file__), "black.png")
VIDEO_EXTENSIONS = tuple(config.get('video_extensions', ['.mp4', '.mkv']))
MPV_SOCKET_PATH = "/tmp/mpvsocket" # Cesta k ovládacímu socketu
available_videos = []
# --- Nové funkce pro ovládání mpv ---
def send_mpv_command(command_json):
"""Pošle JSON příkaz do běžícího mpv přes socat."""
full_command = f"echo '{command_json}' | socat - {MPV_SOCKET_PATH}"
try:
subprocess.run(full_command, shell=True, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError:
# Chybu ignorujeme, pokud socket není připraven, další pokus proběhne při příštím pohybu
pass
# --- Hlavní logika ---
def load_videos():
"""Načte seznam video souborů."""
global available_videos
try:
files = os.listdir(VIDEOS_DIR)
available_videos = [os.path.join(VIDEOS_DIR, f) for f in files if f.lower().endswith(VIDEO_EXTENSIONS)]
except Exception:
available_videos = []
def play_random_ghost():
"""Pošle mpv příkaz k přehrání videa JEDNOU a návratu k černé obrazovce."""
print("Pohyb detekován! Posílám příkaz do mpv...")
load_videos()
if not available_videos:
return
video_to_play = random.choice(available_videos)
try:
# Příkaz č. 1: Načti a přehraj video, ale s vypnutou smyčkou (loop=no)
command1 = f'{{"command": ["loadfile", "{video_to_play}", "replace", "loop=no"]}}'
send_mpv_command(command1)
# Příkaz č. 2: Po skončení videa se vrať k zobrazení černého obrázku
command2 = f'{{"command": ["loadfile", "{BLACK_IMAGE_PATH}", "append-play"]}}'
send_mpv_command(command2)
except Exception:
pass
# --- Hlavní spuštění ---
# Ukončíme všechny staré instance mpv pro čistý start
os.system("pkill -f mpv")
time.sleep(1)
load_videos()
# Spustíme mpv na pozadí s otevřeným ovládacím socketem
subprocess.Popen([
"/usr/bin/mpv",
"--no-audio",
"--fullscreen",
"--loop-file=inf",
"--idle", # Zůstane aktivní, i když nic nehraje
f"--input-ipc-server={MPV_SOCKET_PATH}", # Klíčový parametr pro ovládání
BLACK_IMAGE_PATH
])
time.sleep(2) # Dáme mpv čas na spuštění a vytvoření socketu
# Nastavení senzoru pohybu
pir = MotionSensor(PIR_PIN)
pir.when_motion = play_random_ghost
print("Aplikace ducha běží, mpv je ovládáno na dálku. Čekám na pohyb...")
try:
pause()
except KeyboardInterrupt:
print("Ukončuji aplikaci.")
os.system("pkill -f mpv") # Při ukončení uklidíme