CAMP can capture canals
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

439 lines
15 KiB

#!/usr/bin/python3
import json
import time
import xml.etree.ElementTree
from datetime import datetime
import requests
from requests.auth import HTTPDigestAuth
def PTZData(d, root="PTZData"):
op = lambda tag: '<' + tag + '>'
cl = lambda tag: '</' + tag + '>\n'
ml = lambda v, xml: xml + op(key) + str(v) + cl(key)
xml = op(root) + '\n' if root else ""
if isinstance(d, list):
for v in d:
xml = ml(v, xml)
else:
for key, vl in d.items():
if isinstance(vl, list):
for v in vl:
xml = ml(v, xml)
if isinstance(vl, dict):
xml = ml('\n' + PTZData(vl, None), xml)
if not isinstance(vl, (list, dict)):
xml = ml(vl, xml)
xml += cl(root) if root else ""
return xml
def etree_to_list(t):
d = []
k = t.tag.split('}')[-1]
v = list(map(etree_to_dict, list(t)))
if len(v) == 1:
v = v[0]
elif v and not isinstance(v[0], dict):
v_ = {}
for kv in v:
for key, value in kv.items():
if isinstance(value, str) and (value.isdigit() or value[1:].isdigit()):
value = int(value)
v_[key] = value
v = v_
return v
def etree_to_dict(t):
d = {}
k = t.tag.split('}')[-1]
v = list(map(etree_to_dict, list(t)))
if len(v) == 1:
v = v[0]
#elif v and not isinstance(v[0], dict):
else:
v_ = {}
for kv in v:
for key, value in kv.items():
if isinstance(value, str) and (value.isdigit() or value[1:].isdigit()):
value = int(value)
v_[key] = value
v = v_
d = {k: v}
#d.update(('@' + k, v) for k, v in t.attrib.items())
if not v:
d[k] = t.text
elif t.text.strip():
d['text'] = t.text
return d
def is_close_enough(a, b):
return abs(a['elevation'] - b['elevation']) < 2 \
and abs(a['absoluteZoom'] - b['absoluteZoom']) < 2 \
and abs(a['azimuth'] - b['azimuth']) < 2
class Camera:
STOP = {'pan': 0, 'tilt': 0, 'zoom': 0}
LEFT = {'pan': -1, 'tilt': 0, 'zoom': 0}
RIGHT = {'pan': 1, 'tilt': 0, 'zoom': 0}
UP = {'pan': 0, 'tilt': 1, 'zoom': 0}
DOWN = {'pan': 0, 'tilt': -1, 'zoom': 0}
LEFT_UP = {'pan': -1, 'tilt': 1, 'zoom': 0}
LEFT_DOWN = {'pan': -1, 'tilt': -1, 'zoom': 0}
RIGHT_UP = {'pan': 1, 'tilt': 1, 'zoom': 0}
RIGHT_DOWN = {'pan': 1, 'tilt': -1, 'zoom': 0}
IN = {'pan': 0, 'tilt': 0, 'zoom': 1}
OUT = {'pan': 0, 'tilt': 0, 'zoom': -1}
abort = False
sequence_time = 0
segment_times = {}
next_target = None
last_position = {}
def __init__(self, ip='192.168.1.64', username='admin', password='admin', channel=1):
self.ip = ip
self.username = username
self.password = password
self.channel = channel
self.auth = HTTPDigestAuth(self.username, self.password)
def put_aux(self, cmd):
data = PTZData(cmd, root='PTZAux')
r = self.put('auxcontrols/1', data)
return r
def wipe(self):
cmd = {
'id': '1',
'type': 'WIPER',
'status': 'on'
}
r = self.put_aux(cmd)
def url(self, method):
return 'http://{}/ISAPI/PTZCtrl/channels/{}/{}'.format(self.ip, self.channel, method)
def parse_xml(self, data):
return etree_to_dict(xml.etree.ElementTree.fromstring(data))
def status(self):
s = self.parse_xml(self.get('status'))
if 'PTZStatus' in s:
self.last_position = s['PTZStatus']['AbsoluteHigh']
return self.last_position
return {}
def get(self, method):
return requests.get(self.url(method), auth=self.auth).text
def put(self, method, data):
if isinstance(data, dict):
data = PTZData(data)
return requests.put(self.url(method), data=data, auth=self.auth).text
def momentary(self, cmd, duration=None):
if duration:
cmd['Momentary'] = {
'duration': int(duration * 1000)
}
r = self.put('momentary', cmd)
if '<statusCode>1</statusCode>' not in r:
print(r)
def continuous(self, cmd, duration=None):
r = self.put('continuous', cmd)
if '<statusCode>1</statusCode>' not in r:
print(r)
elif duration:
self.sleep(duration)
self.continuous(self.STOP)
def set_preset(self, id, name):
data = PTZData({
'id': str(id),
'presetName': name
}, 'PTZPreset')
r = self.put('presets/%s' % id, data)
if '<statusCode>1</statusCode>' not in r:
print(r)
def set_presets(self, presets):
for preset in presets:
self.absolute(**preset['position'])
time.sleep(5)
self.goto(**preset['position'])
self.set_preset(preset['id'], preset['name'])
time.sleep(1)
print('presets loaded')
def get_hour(self):
return time.localtime().tm_hour
def hour_changed(self):
return self.hour is not None and self.get_hour() != self.hour
def sleep(self, seconds):
n = float(seconds)
if self.abort or self.hour_changed():
return
while n > 0:
step = min(n, 0.5)
time.sleep(step)
n -= step
if self.abort or self.hour_changed():
return
def sequence(self, steps=[], speed=12, goto_first=True, loop=False, hour_loop=False):
self.sequence_start = 0
if goto_first:
#self.goto_preset(steps[0], pan=100, tilt=100, zoom=100)
first = steps[0]
steps = steps[1:]
if hour_loop:
self.hour = self.get_hour()
else:
self.hour = None
while True:
if goto_first:
segment_t0 = time.time()
if isinstance(first, dict):
if 'seqid' in first and first['seqid'] not in self.segment_times:
self.segment_times[first['seqid']] = 0
first = first['preset']
self.fast_preset(first, True)
#self.goto_preset(first, pan=speed, tilt=speed)
self.sleep(3)
if isinstance(first, dict) and 'sleep' in first:
self.sleep(first['sleep'])
segment_time = time.time() - segment_t0
if isinstance(first, dict) and 'seqid' in first:
self.segment_times[first['seqid']] = segment_time
self.sequence_start = t0 = time.time()
for step in steps:
next_hour = self.get_hour()
if hour_loop and next_hour != self.hour:
self.hour = next_hour
break
self.next_target = step
segment_t0 = time.time()
if self.abort:
return
print('next step %s at %s - %s' % (
steps.index(step),
datetime.now().strftime('%Y-%m-%d_%H:%M:%S'),
json.dumps(step)
))
if isinstance(step, dict):
if 'fast' in step or ('speed' in step and step['speed'] > 100):
self.fast_preset(step.get('fast', step.get('preset')), True)
else:
if 'preset' in step:
kwargs = {
'pan': step['speed'],
'tilt': step['speed'],
}
if step.get('zoom'):
kwargs['zoom'] = step['zoom']
if 'zoom_last' in step:
kwargs['zoom_last'] = step['zoom_last']
self.goto_preset(step['preset'], **kwargs)
if 'sleep' in step:
self.sleep(step['sleep'])
else:
self.goto_preset(step, pan=speed, tilt=speed)
segment_time = time.time() - segment_t0
if isinstance(step, dict) and 'seqid' in step:
self.segment_times[step['seqid']] = segment_time
self.sequence_time = time.time() - t0
self.next_target = None
if not loop and not hour_loop:
break
def fast_preset(self, id, wait=False):
self.put('presets/%s/goto' % id, None)
if wait:
preset = self.get_preset(id)['position']
last = self.status()
while not is_close_enough(last, preset):
time.sleep(0.1)
last = self.status()
def goto_preset(self, id, pan, tilt, zoom=1, zoom_last=False):
if self.abort or self.hour_changed():
return
presets = self.get_presets(True)
if isinstance(id, str):
preset = [p for p in presets if p['name'] == id]
if not preset:
raise Exception('unknown preset %s' % id)
id = preset[0]['id']
else:
preset = [p for p in presets if p['id'] == id]
if not preset:
raise Exception('unknown preset %s' % id)
preset = preset[0]
i = preset['position']
if i['elevation'] < 0:
i['elevation'] -= 1
i['azimuth'] += 1
print('goto preset', id, i['elevation'], i['azimuth'], i['absoluteZoom'])
return self.goto(i['elevation'], i['azimuth'], i['absoluteZoom'],
pan=pan, tilt=tilt, zoom=zoom, zoom_last=zoom_last)
def goto(self, elevation, azimuth, absoluteZoom, speed=1, pan=None, tilt=None, zoom=None, zoom_last=False):
t0 = time.time()
start = self.status()
target = {'elevation': elevation, 'azimuth': azimuth, 'absoluteZoom': absoluteZoom}
print('start:', start, 'target:', target)
if zoom_last:
goto_last = {
'speed': speed,
'pan': pan,
'tilt': tilt,
'zoom': zoom,
}
goto_last.update(target)
target['absoluteZoom'] = start['absoluteZoom']
def get_delta():
current = self.status()
delta = {}
for key in current:
delta[key] = target[key] - current[key]
return current, delta
current, delta = get_delta()
if pan is None:
pan = speed
if tilt is None:
tilt = speed
if zoom is None:
zoom = 1
if delta['elevation'] > 0:
tilt = -tilt
if delta['azimuth'] < 0:
pan = -pan
if delta['absoluteZoom'] < 0:
zoom = -zoom
if pan and not delta['azimuth']:
pan = 0
if tilt and not delta['elevation']:
tilt = 0
if zoom and not delta['absoluteZoom']:
zoom = 0
move = {'pan': pan, 'tilt': tilt, 'zoom': zoom}
direction = {k: v >= 0 for k, v in delta.items()}
print(move, direction)
self.continuous(move)
n = 0
while sum(map(abs, move.values())):
n += 1
#print(current, delta, move)
current, delta = get_delta()
changed = False
if delta and 'azimuth' in delta and 'azimuth' in direction:
if pan and (not delta['azimuth'] or (delta['azimuth'] >= 0) != direction['azimuth']):
pan = 0
changed = True
elif delta:
print('why no azimuth? delta: %s direction: %s' % (delta, direction))
'''
if pan > 0:
pan = min(pan, delta['azimuth'])
change = True
elif pan < 0 and delta['azimuth'] < 0:
pan = max(pan, delta['azimuth'])
changed = True
'''
if delta and 'elevation' in delta and 'elevation' in direction:
if tilt and delta and (not delta['elevation'] or (delta['elevation'] >= 0) != direction['elevation']):
tilt = 0
changed = True
elif delta:
print('why no elevation? delta: %s direction: %s' % (delta, direction))
'''
if tilt > 0:
tilt = min(tilt, delta['elevation'])
change = True
elif tilt < 0 and delta['elevation'] < 0:
tilt = max(tilt, delta['elevation'])
changed = True
'''
if delta and 'absoluteZoom' in delta and 'absoluteZoom' in direction:
if zoom and (not delta['absoluteZoom'] or (delta['absoluteZoom'] >= 0) != direction['absoluteZoom']):
zoom = 0
changed = True
elif delta:
print('no absoluteZoom in delta?', delta, direction)
if changed or not n % 500:
print('update move', move, current)
move = {'pan': pan, 'tilt': tilt, 'zoom': zoom}
self.continuous(self.STOP)
self.continuous(move)
if sum(map(abs, move.values())) and not self.abort:
time.sleep(0.1)
if self.abort or self.hour_changed():
self.continuous(self.STOP)
return
self.continuous(self.STOP)
print(time.time() - t0)
if zoom_last:
return self.goto(**goto_last)
else:
return get_delta()
def get_preset(self, id):
presets = self.get_presets(True)
if isinstance(id, str):
preset = [p for p in presets if p['name'] == id]
if not preset:
raise Exception('unknown preset %s' % id)
id = preset[0]['id']
else:
preset = [p for p in presets if p['id'] == id]
if not preset:
raise Exception('unknown preset %s' % id)
preset = preset[0]
return preset
def absolute(self, elevation, azimuth, absoluteZoom):
cmd = {
'AbsoluteHigh': {
'elevation': elevation,
'azimuth': azimuth,
'absoluteZoom': absoluteZoom
}
}
r = self.put('absolute', cmd)
if '<statusCode>1</statusCode>' not in r:
print(r)
return False
return True
def get_presets_xml(self):
return self.get('presets')
def get_presets(self, details=False):
system_presets = list(range(33, 47)) + list(range(90, 106))
presets = etree_to_list(xml.etree.ElementTree.fromstring(self.get('presets')))
presets = [k['PTZPreset'] for k in presets if k['PTZPreset']['id'] not in system_presets]
for preset in presets:
del preset['enabled']
if not details:
del preset['AbsoluteHigh']
else:
preset['position'] = preset.pop('AbsoluteHigh')
preset['name'] = preset.pop('presetName')
return presets