406 lines
14 KiB
Python
406 lines
14 KiB
Python
#!/usr/bin/python3
|
|
import json
|
|
import time
|
|
import xml.etree.ElementTree
|
|
|
|
import requests
|
|
from requests.auth import HTTPDigestAuth
|
|
|
|
|
|
def PTZData(d, root="PTZData"):
|
|
op = lambda tag: '<' + tag + '>'
|
|
cl = lambda tag: '</' + tag + '>\n'
|
|
ml = lambda v, xml: xml + op(key) + str(v) + cl(key)
|
|
xml = op(root) + '\n' if root else ""
|
|
if isinstance(d, list):
|
|
for v in d:
|
|
xml = ml(v, xml)
|
|
else:
|
|
for key, vl in d.items():
|
|
if isinstance(vl, list):
|
|
for v in vl:
|
|
xml = ml(v, xml)
|
|
if isinstance(vl, dict):
|
|
xml = ml('\n' + PTZData(vl, None), xml)
|
|
if not isinstance(vl, (list, dict)):
|
|
xml = ml(vl, xml)
|
|
xml += cl(root) if root else ""
|
|
return xml
|
|
|
|
def etree_to_list(t):
|
|
d = []
|
|
k = t.tag.split('}')[-1]
|
|
v = list(map(etree_to_dict, t.getchildren()))
|
|
if len(v) == 1:
|
|
v = v[0]
|
|
elif v and not isinstance(v[0], dict):
|
|
v_ = {}
|
|
for kv in v:
|
|
for key, value in kv.items():
|
|
if isinstance(value, str) and (value.isdigit() or value[1:].isdigit()):
|
|
value = int(value)
|
|
v_[key] = value
|
|
v = v_
|
|
return v
|
|
|
|
def etree_to_dict(t):
|
|
d = {}
|
|
k = t.tag.split('}')[-1]
|
|
v = list(map(etree_to_dict, t.getchildren()))
|
|
if len(v) == 1:
|
|
v = v[0]
|
|
#elif v and not isinstance(v[0], dict):
|
|
else:
|
|
v_ = {}
|
|
for kv in v:
|
|
for key, value in kv.items():
|
|
if isinstance(value, str) and (value.isdigit() or value[1:].isdigit()):
|
|
value = int(value)
|
|
v_[key] = value
|
|
v = v_
|
|
d = {k: v}
|
|
#d.update(('@' + k, v) for k, v in t.attrib.items())
|
|
if not v:
|
|
d[k] = t.text
|
|
elif t.text.strip():
|
|
d['text'] = t.text
|
|
return d
|
|
|
|
def is_close_enough(a, b):
|
|
return abs(a['elevation'] - b['elevation']) < 2 \
|
|
and abs(a['absoluteZoom'] - b['absoluteZoom']) < 2 \
|
|
and abs(a['azimuth'] - b['azimuth']) < 2
|
|
|
|
|
|
class Camera:
|
|
STOP = {'pan': 0, 'tilt': 0, 'zoom': 0}
|
|
LEFT = {'pan': -1, 'tilt': 0, 'zoom': 0}
|
|
RIGHT = {'pan': 1, 'tilt': 0, 'zoom': 0}
|
|
UP = {'pan': 0, 'tilt': 1, 'zoom': 0}
|
|
DOWN = {'pan': 0, 'tilt': -1, 'zoom': 0}
|
|
LEFT_UP = {'pan': -1, 'tilt': 1, 'zoom': 0}
|
|
LEFT_DOWN = {'pan': -1, 'tilt': -1, 'zoom': 0}
|
|
RIGHT_UP = {'pan': 1, 'tilt': 1, 'zoom': 0}
|
|
RIGHT_DOWN = {'pan': 1, 'tilt': -1, 'zoom': 0}
|
|
|
|
IN = {'pan': 1, 'tilt': 0, 'zoom': 1}
|
|
OUT = {'pan': 1, 'tilt': 0, 'zoom': -1}
|
|
|
|
abort = False
|
|
sequence_time = 0
|
|
segment_times = {}
|
|
next_target = None
|
|
last_position = {}
|
|
|
|
def __init__(self, ip='192.168.1.64', username='admin', password='admin', channel=1):
|
|
self.ip = ip
|
|
self.username = username
|
|
self.password = password
|
|
self.channel = channel
|
|
self.auth = HTTPDigestAuth(self.username, self.password)
|
|
|
|
def put_aux(self, cmd):
|
|
data = PTZData(cmd, root='PTZAux')
|
|
r = self.put('auxcontrols/1', data)
|
|
return r
|
|
|
|
def wipe(self):
|
|
cmd = {
|
|
'id': '1',
|
|
'type': 'WIPER',
|
|
'status': 'on'
|
|
}
|
|
r = self.put_aux(cmd)
|
|
|
|
def url(self, method):
|
|
return 'http://{}/ISAPI/PTZCtrl/channels/{}/{}'.format(self.ip, self.channel, method)
|
|
|
|
def parse_xml(self, data):
|
|
return etree_to_dict(xml.etree.ElementTree.fromstring(data))
|
|
|
|
def status(self):
|
|
s = self.parse_xml(self.get('status'))
|
|
if 'PTZStatus' in s:
|
|
self.last_position = s['PTZStatus']['AbsoluteHigh']
|
|
return self.last_position
|
|
return {}
|
|
|
|
def get(self, method):
|
|
return requests.get(self.url(method), auth=self.auth).text
|
|
|
|
def put(self, method, data):
|
|
if isinstance(data, dict):
|
|
data = PTZData(data)
|
|
return requests.put(self.url(method), data=data, auth=self.auth).text
|
|
|
|
def momentary(self, cmd, duration=None):
|
|
if duration:
|
|
cmd['Momentary'] = {
|
|
'duration': int(duration * 1000)
|
|
}
|
|
r = self.put('momentary', cmd)
|
|
if '<statusCode>1</statusCode>' not in r:
|
|
print(r)
|
|
|
|
def continuous(self, cmd, duration=None):
|
|
r = self.put('continuous', cmd)
|
|
if '<statusCode>1</statusCode>' not in r:
|
|
print(r)
|
|
elif duration:
|
|
time.sleep(duration)
|
|
self.continuous(self.STOP)
|
|
|
|
def set_preset(self, id, name):
|
|
data = PTZData({
|
|
'id': str(id),
|
|
'presetName': name
|
|
}, 'PTZPreset')
|
|
r = self.put('presets/%s' % id, data)
|
|
if '<statusCode>1</statusCode>' not in r:
|
|
print(r)
|
|
|
|
def set_presets(self, presets):
|
|
for preset in presets:
|
|
self.absolute(**preset['position'])
|
|
time.sleep(5)
|
|
self.goto(**preset['position'])
|
|
self.set_preset(preset['id'], preset['name'])
|
|
time.sleep(1)
|
|
print('presets loaded')
|
|
|
|
def sequence(self, steps=[], speed=12, goto_first=True, loop=False):
|
|
self.sequence_start = 0
|
|
if goto_first:
|
|
#self.goto_preset(steps[0], pan=100, tilt=100, zoom=100)
|
|
first = steps[0]
|
|
steps = steps[1:]
|
|
while True:
|
|
if goto_first:
|
|
if isinstance(first, dict):
|
|
if 'seqid' in first and first['seqid'] not in self.segment_times:
|
|
self.segment_times[first['seqid']] = 0
|
|
first = first['preset']
|
|
self.fast_preset(first, True)
|
|
#self.goto_preset(first, pan=speed, tilt=speed)
|
|
if self.abort:
|
|
return
|
|
time.sleep(3)
|
|
self.sequence_start = t0 = time.time()
|
|
for step in steps:
|
|
self.next_target = step
|
|
segment_t0 = time.time()
|
|
if self.abort:
|
|
return
|
|
if isinstance(step, dict):
|
|
if 'fast' in step or ('speed' in step and step['speed'] > 100):
|
|
self.fast_preset(step.get('fast', step.get('preset')), True)
|
|
else:
|
|
if 'preset' in step:
|
|
kwargs = {
|
|
'pan': step['speed'],
|
|
'tilt': step['speed'],
|
|
}
|
|
if step.get('zoom'):
|
|
kwargs['zoom'] = step['zoom']
|
|
if 'zoom_last' in step:
|
|
kwargs['zoom_last'] = step['zoom_last']
|
|
self.goto_preset(step['preset'], **kwargs)
|
|
if 'sleep' in step:
|
|
if self.abort:
|
|
return
|
|
time.sleep(float(step['sleep']))
|
|
else:
|
|
self.goto_preset(step, pan=speed, tilt=speed)
|
|
|
|
segment_time = time.time() - segment_t0
|
|
if isinstance(step, dict) and 'seqid' in step:
|
|
self.segment_times[step['seqid']] = segment_time
|
|
self.sequence_time = time.time() - t0
|
|
self.next_target = None
|
|
if not loop:
|
|
break
|
|
|
|
def fast_preset(self, id, wait=False):
|
|
self.put('presets/%s/goto' % id, None)
|
|
if wait:
|
|
preset = self.get_preset(id)['position']
|
|
last = self.status()
|
|
|
|
while not is_close_enough(last, preset):
|
|
time.sleep(0.1)
|
|
last = self.status()
|
|
|
|
def goto_preset(self, id, pan, tilt, zoom=1, zoom_last=False):
|
|
if self.abort:
|
|
return
|
|
presets = self.get_presets(True)
|
|
if isinstance(id, str):
|
|
preset = [p for p in presets if p['name'] == id]
|
|
if not preset:
|
|
raise Exception('unknown preset %s' % id)
|
|
id = preset[0]['id']
|
|
else:
|
|
preset = [p for p in presets if p['id'] == id]
|
|
if not preset:
|
|
raise Exception('unknown preset %s' % id)
|
|
preset = preset[0]
|
|
i = preset['position']
|
|
if i['elevation'] < 0:
|
|
i['elevation'] -= 1
|
|
i['azimuth'] += 1
|
|
print('goto preset', id, i['elevation'], i['azimuth'], i['absoluteZoom'])
|
|
return self.goto(i['elevation'], i['azimuth'], i['absoluteZoom'],
|
|
pan=pan, tilt=tilt, zoom=zoom, zoom_last=zoom_last)
|
|
|
|
def goto(self, elevation, azimuth, absoluteZoom, speed=1, pan=None, tilt=None, zoom=None, zoom_last=False):
|
|
t0 = time.time()
|
|
start = self.status()
|
|
target = {'elevation': elevation, 'azimuth': azimuth, 'absoluteZoom': absoluteZoom}
|
|
print('start:', start, 'target:', target)
|
|
|
|
if zoom_last:
|
|
goto_last = {
|
|
'speed': speed,
|
|
'pan': pan,
|
|
'tilt': tilt,
|
|
'zoom': zoom,
|
|
}
|
|
goto_last.update(target)
|
|
target['absoluteZoom'] = start['absoluteZoom']
|
|
|
|
def get_delta():
|
|
current = self.status()
|
|
delta = {}
|
|
for key in current:
|
|
delta[key] = target[key] - current[key]
|
|
return current, delta
|
|
|
|
current, delta = get_delta()
|
|
|
|
if pan is None:
|
|
pan = speed
|
|
if tilt is None:
|
|
tilt = speed
|
|
if zoom is None:
|
|
zoom = 1
|
|
if delta['elevation'] > 0:
|
|
tilt = -tilt
|
|
if delta['azimuth'] < 0:
|
|
pan = -pan
|
|
if delta['absoluteZoom'] < 0:
|
|
zoom = -zoom
|
|
if pan and not delta['azimuth']:
|
|
pan = 0
|
|
if tilt and not delta['elevation']:
|
|
tilt = 0
|
|
if zoom and not delta['absoluteZoom']:
|
|
zoom = 0
|
|
|
|
move = {'pan': pan, 'tilt': tilt, 'zoom': zoom}
|
|
direction = {k: v >= 0 for k, v in delta.items()}
|
|
print(move, direction)
|
|
self.continuous(move)
|
|
n = 0
|
|
while sum(map(abs, move.values())):
|
|
n += 1
|
|
#print(current, delta, move)
|
|
current, delta = get_delta()
|
|
changed = False
|
|
if delta and 'azimuth' in delta and 'azimuth' in direction:
|
|
if pan and (not delta['azimuth'] or (delta['azimuth'] >= 0) != direction['azimuth']):
|
|
pan = 0
|
|
changed = True
|
|
elif delta:
|
|
print('why no azimuth? delta: %s direction: %s' % (delta, direction))
|
|
'''
|
|
if pan > 0:
|
|
pan = min(pan, delta['azimuth'])
|
|
change = True
|
|
elif pan < 0 and delta['azimuth'] < 0:
|
|
pan = max(pan, delta['azimuth'])
|
|
changed = True
|
|
'''
|
|
if delta and 'elevation' in delta and 'elevation' in direction:
|
|
if tilt and delta and (not delta['elevation'] or (delta['elevation'] >= 0) != direction['elevation']):
|
|
tilt = 0
|
|
changed = True
|
|
elif delta:
|
|
print('why no elevation? delta: %s direction: %s' % (delta, direction))
|
|
|
|
'''
|
|
if tilt > 0:
|
|
tilt = min(tilt, delta['elevation'])
|
|
change = True
|
|
elif tilt < 0 and delta['elevation'] < 0:
|
|
tilt = max(tilt, delta['elevation'])
|
|
changed = True
|
|
'''
|
|
if delta and 'absoluteZoom' in delta and 'absoluteZoom' in direction:
|
|
if zoom and (not delta['absoluteZoom'] or (delta['absoluteZoom'] >= 0) != direction['absoluteZoom']):
|
|
zoom = 0
|
|
changed = True
|
|
elif delta:
|
|
print('no absoluteZoom in delta?', delta, direction)
|
|
|
|
if changed or not n % 500:
|
|
print('update move', move, current)
|
|
move = {'pan': pan, 'tilt': tilt, 'zoom': zoom}
|
|
self.continuous(self.STOP)
|
|
self.continuous(move)
|
|
if sum(map(abs, move.values())) and not self.abort:
|
|
time.sleep(0.1)
|
|
if self.abort:
|
|
self.continuous(self.STOP)
|
|
return
|
|
self.continuous(self.STOP)
|
|
print(time.time() - t0)
|
|
if zoom_last:
|
|
return self.goto(**goto_last)
|
|
else:
|
|
return get_delta()
|
|
|
|
def get_preset(self, id):
|
|
presets = self.get_presets(True)
|
|
if isinstance(id, str):
|
|
preset = [p for p in presets if p['name'] == id]
|
|
if not preset:
|
|
raise Exception('unknown preset %s' % id)
|
|
id = preset[0]['id']
|
|
else:
|
|
preset = [p for p in presets if p['id'] == id]
|
|
if not preset:
|
|
raise Exception('unknown preset %s' % id)
|
|
preset = preset[0]
|
|
return preset
|
|
|
|
def absolute(self, elevation, azimuth, absoluteZoom):
|
|
cmd = {
|
|
'AbsoluteHigh': {
|
|
'elevation': elevation,
|
|
'azimuth': azimuth,
|
|
'absoluteZoom': absoluteZoom
|
|
}
|
|
}
|
|
r = self.put('absolute', cmd)
|
|
if '<statusCode>1</statusCode>' not in r:
|
|
print(r)
|
|
return False
|
|
return True
|
|
|
|
def get_presets_xml(self):
|
|
return self.get('presets')
|
|
|
|
def get_presets(self, details=False):
|
|
system_presets = list(range(33, 47)) + list(range(90, 106))
|
|
presets = etree_to_list(xml.etree.ElementTree.fromstring(self.get('presets')))
|
|
presets = [k['PTZPreset'] for k in presets if k['PTZPreset']['id'] not in system_presets]
|
|
for preset in presets:
|
|
del preset['enabled']
|
|
if not details:
|
|
del preset['AbsoluteHigh']
|
|
else:
|
|
preset['position'] = preset.pop('AbsoluteHigh')
|
|
preset['name'] = preset.pop('presetName')
|
|
return presets
|
|
|