dmx/sequence.py
2024-08-07 17:27:25 +05:30

195 lines
6.2 KiB
Python
Executable File

#!/usr/bin/python3
import os
import sys
import json
import subprocess
from time import sleep
from PyDMXControl.controllers import OpenDMXController
from PyDMXControl.profiles.Generic import Dimmer
import traceback
'''
# "channel" --> channel/line identifier, order as connected to DMX decoder
# seq -> "sequence" = position (seconds) on 'timeline' starting 0
# fadein / fadeout -> milliseconds
# duration -> seconds
# brightness -> 0-255 (optional entry in json, else use default)
'''
# DEFAULTS
# 0-255 dim / brightness range
default_brightness = 5
# fadein, fadeout (milliseconds)
default_fadein = 700
default_fadeout = 500
# wait for things to settle...
print("\n * Wait 15 seconds...")
sleep(15)
try:
base = os.path.expanduser("~/bin/dmx")
## COMMENT THIS SECTION WHEN TIMELINE IS FINAL
#with open(f"{base}/getseq/getspread.py") as getspread:
#exec(getspread.read())
##
data = json.load(open(f"{base}/sequence.json"))
except Exception as argh:
with open("zlog.txt", "a") as e:
e.write(" _____\n")
e.write(str(argh))
e.write(traceback.format_exc())
e.write('\n\n')
## ON events
def start_event(channel):
channel_start = {}
channel_start["channel"] = channel["channel"]
channel_start["event"] = "on"
channel_start["seq"] = channel["seq"]
if "brightness" in channel:
channel_start["brightness"] = int(channel["brightness"])
if "fadein" in channel:
channel_start["fadein"] = int(channel["fadein"])
return channel_start
## OFF events
def stop_event(channel):
channel_stop = {}
channel_stop["channel"] = channel["channel"]
channel_stop["event"] = "off"
# duration from 0
stop_time = channel["seq"] + channel["duration"]
channel_stop["seq"] = stop_time
if "fadeout" in channel:
channel_stop["fadeout"] = int(channel["fadeout"])
return channel_stop
## each 'time' (sec) an event occurs
def add_event(events, event):
if event["seq"] not in events:
events[event["seq"]] = []
# add start/on and stop/off events to respective times
events[event["seq"]].append(event)
return
try:
data_connected = [ l for l in data \
if ("seq" in l.keys() and l["seq"]!="") \
and ("duration" in l.keys() and l["duration"] != "") ]
## sort by sequence -> play in this order (timeline)
sequence = sorted(data_connected, key=lambda x: x["seq"])
# print(sequence)
## generate all (on / off) events
events = {}
for i in sequence:
i_start = start_event(i)
i_stop = stop_event(i)
# add start/on and stop/off events to respective 'times'
add_event(events, i_start)
add_event(events, i_stop)
# chronologically sort event KEYS (time in sec / seq )
chrono = list(events.keys())
chrono.sort()
# chronological timeline with events (using sorted KEYS/time)
timeline = { x: events[x] for x in chrono }
with open('timeline.json','w') as f:
json.dump(timeline, f, indent=2)
except Exception as argh:
with open("zlog.txt", "a") as e:
e.write(" _____\n")
e.write(str(argh))
e.write(traceback.format_exc())
e.write('\n\n')
try:
print("\n * Reset USB device")
subprocess.call(['usbreset', "FT232R USB UART"])
except Exception as argh:
with open("zlog.txt", "a") as e:
e.write(" _____\n")
e.write(str(argh))
e.write(traceback.format_exc())
e.write('\n\n')
try:
print("\n ** DMX start...")
dmx = OpenDMXController()
## sort by "channel" -> channel/line number
lines = sorted(data, key=lambda x: x["channel"])
## remove dupes and add lights to DMX once in this order --> [ channels ]
channels = []
for l in lines:
if l['channel'] not in channels:
channels.append(l['channel'])
line = dmx.add_fixture(Dimmer, name=l['channel'])
#print(channels)
#dmx.web_control()
except Exception as argh:
with open("zlog.txt", "a") as e:
e.write(" _____\n")
e.write(str(argh))
e.write(traceback.format_exc())
e.write('\n\n')
try:
# loop 'forever'
while 1 > 0:
print()
# begin timeline
print(" * Start timeline...")
t = 0
# for every t (seq)
for i in timeline:
# print(i, timeline[i])
if int(i) > 0:
# hold, count time from last event
sleep(int(i) - t)
for e in timeline[i]:
# for all events at t=i (seq)
line = dmx.get_fixtures_by_name(e["channel"])[0]
if e["event"] == "on":
# fadein value from json if there, else default
fadein = e.get("fadein", default_fadein)
# dim value from json if there, else default
bright = e.get("brightness", default_brightness)
print(f" - {e['seq']}s | #{e['channel']} {e['event'].upper()} (brightness: {bright}, fadein: {fadein})")
line.dim(bright, fadein)
if e["event"] == "off":
# fadeout value from json if there, else default
fadeout = e.get("fadeout", default_fadeout)
print(f" - {e['seq']}s | #{e['channel']} {e['event'].upper()} (fadeout: {fadeout})")
line.dim(0, fadeout)
#sleep(fadeout/1000 + 2) ## to prevent crossfade
# now this is previous t for next event
t = i
# print to stdout next event info ## use chrono (keys list)
if i != chrono[-1]:
next_t = chrono[chrono.index(i)+1]
tt = next_t-int(i)
next_event = f" \ Next event [seq={next_t}] in {tt}s"
else:
next_event = " - Last event of sequence"
print(next_event)
except KeyboardInterrupt:
print()
for e in timeline[i]:
# for all events at t=i (seq)
line = dmx.get_fixtures_by_name(e["channel"])[0]
line.dim(0, 1000)
sleep(2)
dmx.close()
sys.exit()
except Exception as argh:
with open("zlog.txt", "a") as e:
e.write(" _____\n")
e.write(str(argh))
e.write(traceback.format_exc())
e.write('\n\n')
dmx.close()