200 lines
6.3 KiB
Python
200 lines
6.3 KiB
Python
|
#!/usr/bin/python3
|
||
|
|
||
|
import os
|
||
|
import sys
|
||
|
import json
|
||
|
import subprocess
|
||
|
from time import sleep
|
||
|
import traceback
|
||
|
#from PyDMXControl.controllers import OpenDMXController
|
||
|
#from PyDMXControl.profiles.Generic import Dimmer
|
||
|
|
||
|
'''
|
||
|
# "channel" --> channel/line identifier, order as connected to DMX decoder
|
||
|
# seq -> "sequence" = position (seconds) on 'timeline' starting 0
|
||
|
# fadein / fadeout -> milliseconds
|
||
|
# duration -> seconds
|
||
|
# brightness -> 0-255 (optional entry in json, else use default)
|
||
|
'''
|
||
|
|
||
|
base = os.path.expanduser("~/bin/dmx")
|
||
|
## COMMENT THIS SECTION WHEN TIMELINE IS FINAL
|
||
|
with open(f"{base}/getseq/getspread.py") as getspread:
|
||
|
exec(getspread.read())
|
||
|
##
|
||
|
|
||
|
# DEFAULTS
|
||
|
# 0-255 dim / brightness range
|
||
|
default_brightness = 10
|
||
|
# fadein, fadeout (milliseconds)
|
||
|
default_fadein = 1000
|
||
|
default_fadeout = 1000
|
||
|
## if "start' not entered in 'start_opt' column, default start is 0
|
||
|
## subtract start_opt_time from on and off events
|
||
|
start_opt_time = 0
|
||
|
|
||
|
data = json.load(open('sequence.json'))
|
||
|
|
||
|
## ON events
|
||
|
def start_event(channel):
|
||
|
channel_start = {}
|
||
|
channel_start["channel"] = channel["channel"]
|
||
|
channel_start["event"] = "on"
|
||
|
channel_start["seq"] = channel["seq"] - start_opt_time
|
||
|
if "brightness" in channel:
|
||
|
channel_start["brightness"] = int(channel["brightness"])
|
||
|
if "fadein" in channel:
|
||
|
channel_start["fadein"] = int(channel["fadein"])
|
||
|
return channel_start
|
||
|
|
||
|
## OFF events
|
||
|
def stop_event(channel):
|
||
|
channel_stop = {}
|
||
|
channel_stop["channel"] = channel["channel"]
|
||
|
channel_stop["event"] = "off"
|
||
|
# duration from 0
|
||
|
stop_time = channel["seq"] + channel["duration"] - start_opt_time
|
||
|
channel_stop["seq"] = stop_time
|
||
|
if "fadeout" in channel:
|
||
|
channel_stop["fadeout"] = int(channel["fadeout"])
|
||
|
return channel_stop
|
||
|
|
||
|
## each 'time' (sec) an event occurs
|
||
|
def add_event(events_dict, event):
|
||
|
if event["seq"] not in events:
|
||
|
events_dict[event["seq"]] = []
|
||
|
# add start/on and stop/off events to respective times
|
||
|
events_dict[event["seq"]].append(event)
|
||
|
return
|
||
|
|
||
|
data_connected = [ l for l in data \
|
||
|
if ("seq" in l.keys() and l["seq"]!="") \
|
||
|
and ("duration" in l.keys() and l["duration"] != "") ]
|
||
|
## sort by sequence -> play in this order (timeline)
|
||
|
sequence = sorted(data_connected, key=lambda x: x["seq"])
|
||
|
# print(sequence)
|
||
|
|
||
|
# start_opt_time -> if 'start' in calc, start playing from that point
|
||
|
for i in sequence:
|
||
|
if "start_opt" in i:
|
||
|
start_opt_time = int(i['seq'])
|
||
|
break
|
||
|
|
||
|
## generate all (on / off) events
|
||
|
events = {}
|
||
|
for i in sequence:
|
||
|
i_start = start_event(i)
|
||
|
i_stop = stop_event(i)
|
||
|
# add start/on and stop/off events to respective 'times'
|
||
|
add_event(events, i_start)
|
||
|
add_event(events, i_stop)
|
||
|
|
||
|
# chronologically sort event KEYS (time in sec / seq )
|
||
|
chrono = list(events.keys())
|
||
|
chrono.sort()
|
||
|
|
||
|
# chronological timeline with events (using sorted KEYS/time)
|
||
|
timeline = { x: events[x] for x in chrono if x >= 0 }
|
||
|
# if seq==0 and "event"=="off" -> remove / don't add
|
||
|
for e in timeline[0]:
|
||
|
if e["event"] == "off":
|
||
|
timeline[0].remove(e)
|
||
|
|
||
|
# MID-START fix for 'previous-ONs' - on before start
|
||
|
channs = []
|
||
|
for t in timeline:
|
||
|
for c in timeline[t]:
|
||
|
if c['event'] == 'on':
|
||
|
channs.append(c['channel'])
|
||
|
if c['event'] == 'off':
|
||
|
if c['channel'] not in channs:
|
||
|
channs.append(c['channel'])
|
||
|
else:
|
||
|
channs.remove(c['channel'])
|
||
|
|
||
|
##print(channs)
|
||
|
for ch in channs:
|
||
|
timeline[0].append({"channel": ch, "event": "on", "seq": 0})
|
||
|
|
||
|
with open('timeline.json','w') as f:
|
||
|
json.dump(timeline, f, indent=2)
|
||
|
|
||
|
# wait for things to settle...
|
||
|
print("\n * Wait 3 seconds...")
|
||
|
sleep(3)
|
||
|
|
||
|
print("\n * Reset USB device")
|
||
|
subprocess.call(['usbreset', "FT232R USB UART"])
|
||
|
|
||
|
print("\n ** DMX start...")
|
||
|
##dmx = OpenDMXController()
|
||
|
|
||
|
## sort by "channel" -> channel/line number
|
||
|
lines = sorted(data, key=lambda x: x["channel"])
|
||
|
## remove dupes and add lights to DMX once in this order --> [ channels ]
|
||
|
channels = []
|
||
|
for l in lines:
|
||
|
if l['channel'] not in channels:
|
||
|
channels.append(l['channel'])
|
||
|
##line = dmx.add_fixture(Dimmer, name=l['channel'])
|
||
|
#print(channels)
|
||
|
#dmx.web_control()
|
||
|
|
||
|
try:
|
||
|
# loop 'forever'
|
||
|
while 1 > 0:
|
||
|
print()
|
||
|
# begin timeline
|
||
|
print(" * Start timeline...")
|
||
|
print(f" - start at seq: {start_opt_time}")
|
||
|
t = 0
|
||
|
# for every t (seq)
|
||
|
for i in timeline:
|
||
|
# print(i, timeline[i])
|
||
|
if int(i) > 0:
|
||
|
# hold, count time from last event
|
||
|
sleep(int(i) - t)
|
||
|
for e in timeline[i]:
|
||
|
# for all events at t=i (seq)
|
||
|
##line = dmx.get_fixtures_by_name(e["channel"])[0]
|
||
|
if e["event"] == "on":
|
||
|
# fadein value from json if there, else default
|
||
|
fadein = e.get("fadein", default_fadein)
|
||
|
# dim value from json if there, else default
|
||
|
bright = e.get("brightness", default_brightness)
|
||
|
print(f" - {e['seq']}s | #{e['channel']} {e['event'].upper()} (brightness: {bright}, fadein: {fadein})")
|
||
|
##line.dim(bright, fadein)
|
||
|
if e["event"] == "off":
|
||
|
# fadeout value from json if there, else default
|
||
|
fadeout = e.get("fadeout", default_fadeout)
|
||
|
print(f" - {e['seq']}s | #{e['channel']} {e['event'].upper()} (fadeout: {fadeout})")
|
||
|
##line.dim(0, fadeout)
|
||
|
# now this is previous t for next event
|
||
|
t = i
|
||
|
# print to stdout next event info ## use chrono (keys list)
|
||
|
if i != chrono[-1]:
|
||
|
next_t = chrono[chrono.index(i)+1]
|
||
|
tt = next_t-int(i)
|
||
|
next_event = f" \ Next event [seq={next_t}] in {tt}s"
|
||
|
else:
|
||
|
next_event = " - Last event of sequence"
|
||
|
print(next_event)
|
||
|
except KeyboardInterrupt:
|
||
|
print()
|
||
|
for e in timeline[i]:
|
||
|
print(e["channel"]) # for all events at t=i (seq)
|
||
|
##line = dmx.get_fixtures_by_name(e["channel"])[0]
|
||
|
##line.dim(0, 1000)
|
||
|
sleep(2)
|
||
|
##dmx.close()
|
||
|
sys.exit()
|
||
|
except Exception as argh:
|
||
|
with open("zlog.txt", "a") as e:
|
||
|
e.write(" _____")
|
||
|
e.write(str(argh))
|
||
|
e.write(traceback.format_exc())
|
||
|
e.write('\n\n')
|
||
|
|
||
|
##dmx.close()
|
||
|
|