#!/usr/bin/python3 import json import time import xml.etree.ElementTree import requests from requests.auth import HTTPDigestAuth def PTZData(d, root="PTZData"): op = lambda tag: '<' + tag + '>' cl = lambda tag: '\n' ml = lambda v, xml: xml + op(key) + str(v) + cl(key) xml = op(root) + '\n' if root else "" if isinstance(d, list): for v in d: xml = ml(v, xml) else: for key, vl in d.items(): if isinstance(vl, list): for v in vl: xml = ml(v, xml) if isinstance(vl, dict): xml = ml('\n' + PTZData(vl, None), xml) if not isinstance(vl, (list, dict)): xml = ml(vl, xml) xml += cl(root) if root else "" return xml def etree_to_list(t): d = [] k = t.tag.split('}')[-1] v = list(map(etree_to_dict, t.getchildren())) if len(v) == 1: v = v[0] elif v and not isinstance(v[0], dict): v_ = {} for kv in v: for key, value in kv.items(): if isinstance(value, str) and (value.isdigit() or value[1:].isdigit()): value = int(value) v_[key] = value v = v_ return v def etree_to_dict(t): d = {} k = t.tag.split('}')[-1] v = list(map(etree_to_dict, t.getchildren())) if len(v) == 1: v = v[0] #elif v and not isinstance(v[0], dict): else: v_ = {} for kv in v: for key, value in kv.items(): if isinstance(value, str) and (value.isdigit() or value[1:].isdigit()): value = int(value) v_[key] = value v = v_ d = {k: v} #d.update(('@' + k, v) for k, v in t.attrib.items()) if not v: d[k] = t.text elif t.text.strip(): d['text'] = t.text return d def is_close_enough(a, b): return abs(a['elevation'] - b['elevation']) < 2 \ and abs(a['absoluteZoom'] - b['absoluteZoom']) < 2 \ and abs(a['azimuth'] - b['azimuth']) < 2 class Camera: STOP = {'pan': 0, 'tilt': 0, 'zoom': 0} LEFT = {'pan': -1, 'tilt': 0, 'zoom': 0} RIGHT = {'pan': 1, 'tilt': 0, 'zoom': 0} UP = {'pan': 0, 'tilt': 1, 'zoom': 0} DOWN = {'pan': 0, 'tilt': -1, 'zoom': 0} LEFT_UP = {'pan': -1, 'tilt': 1, 'zoom': 0} LEFT_DOWN = {'pan': -1, 'tilt': -1, 'zoom': 0} RIGHT_UP = {'pan': 1, 'tilt': 1, 'zoom': 0} RIGHT_DOWN = {'pan': 1, 'tilt': -1, 'zoom': 0} IN = {'pan': 0, 'tilt': 0, 'zoom': 1} OUT = {'pan': 0, 'tilt': 0, 'zoom': -1} abort = False sequence_time = 0 segment_times = {} next_target = None last_position = {} def __init__(self, ip='192.168.1.64', username='admin', password='admin', channel=1): self.ip = ip self.username = username self.password = password self.channel = channel self.auth = HTTPDigestAuth(self.username, self.password) def put_aux(self, cmd): data = PTZData(cmd, root='PTZAux') r = self.put('auxcontrols/1', data) return r def wipe(self): cmd = { 'id': '1', 'type': 'WIPER', 'status': 'on' } r = self.put_aux(cmd) def url(self, method): return 'http://{}/ISAPI/PTZCtrl/channels/{}/{}'.format(self.ip, self.channel, method) def parse_xml(self, data): return etree_to_dict(xml.etree.ElementTree.fromstring(data)) def status(self): s = self.parse_xml(self.get('status')) if 'PTZStatus' in s: self.last_position = s['PTZStatus']['AbsoluteHigh'] return self.last_position return {} def get(self, method): while True: try: return requests.get(self.url(method), auth=self.auth).text except requests.exceptions.ConnectionError: time.sleep(0.1) def put(self, method, data): if isinstance(data, dict): data = PTZData(data) while True: try: return requests.put(self.url(method), data=data, auth=self.auth).text except requests.exceptions.ConnectionError: time.sleep(0.1) def momentary(self, cmd, duration=None): if duration: cmd['Momentary'] = { 'duration': int(duration * 1000) } r = self.put('momentary', cmd) if '1' not in r: print(r) def continuous(self, cmd, duration=None): r = self.put('continuous', cmd) if '1' not in r: print(r) elif duration: self.sleep(duration) self.continuous(self.STOP) def set_preset(self, id, name): data = PTZData({ 'id': str(id), 'presetName': name }, 'PTZPreset') r = self.put('presets/%s' % id, data) if '1' not in r: print(r) def set_presets(self, presets): for preset in presets: self.absolute(**preset['position']) time.sleep(5) self.goto(**preset['position']) self.set_preset(preset['id'], preset['name']) time.sleep(1) print('presets loaded') def get_hour(self): return time.localtime().tm_hour def hour_changed(self): return self.hour is not None and self.get_hour() != self.hour def sleep(self, seconds): n = float(seconds) if self.abort or self.hour_changed(): return while n > 0: step = min(n, 0.5) time.sleep(step) n -= step if self.abort or self.hour_changed(): return def sequence(self, steps=[], speed=12, goto_first=True, loop=False, hour_loop=False): self.sequence_start = 0 if goto_first: #self.goto_preset(steps[0], pan=100, tilt=100, zoom=100) first = steps[0] steps = steps[1:] if hour_loop: self.hour = self.get_hour() else: self.hour = None while True: if goto_first: segment_t0 = time.time() if isinstance(first, dict): if 'seqid' in first and first['seqid'] not in self.segment_times: self.segment_times[first['seqid']] = 0 first = first['preset'] self.fast_preset(first, True) #self.goto_preset(first, pan=speed, tilt=speed) self.sleep(3) if isinstance(first, dict) and 'sleep' in first: self.sleep(first['sleep']) segment_time = time.time() - segment_t0 if isinstance(first, dict) and 'seqid' in first: self.segment_times[first['seqid']] = segment_time self.sequence_start = t0 = time.time() for step in steps: next_hour = self.get_hour() if hour_loop and next_hour != self.hour: self.hour = next_hour break self.next_target = step segment_t0 = time.time() if self.abort: return print('next step %s at %s - %s' % ( steps.index(step), datetime.now().strftime('%Y-%m-%d_%H:%M:%S'), json.dumps(step) )) if isinstance(step, dict): if 'fast' in step or ('speed' in step and step['speed'] > 100): self.fast_preset(step.get('fast', step.get('preset')), True) else: if 'preset' in step: kwargs = { 'pan': step['speed'], 'tilt': step['speed'], } if step.get('zoom'): kwargs['zoom'] = step['zoom'] if 'zoom_last' in step: kwargs['zoom_last'] = step['zoom_last'] self.goto_preset(step['preset'], **kwargs) if 'sleep' in step: self.sleep(step['sleep']) else: self.goto_preset(step, pan=speed, tilt=speed) segment_time = time.time() - segment_t0 if isinstance(step, dict) and 'seqid' in step: self.segment_times[step['seqid']] = segment_time self.sequence_time = time.time() - t0 self.next_target = None if not loop and not hour_loop: break def fast_preset(self, id, wait=False): self.put('presets/%s/goto' % id, None) if wait: preset = self.get_preset(id)['position'] last = self.status() while not is_close_enough(last, preset): time.sleep(0.1) last = self.status() def goto_preset(self, id, pan, tilt, zoom=1, zoom_last=False): if self.abort or self.hour_changed(): return presets = self.get_presets(True) if isinstance(id, str): preset = [p for p in presets if p['name'] == id] if not preset: raise Exception('unknown preset %s' % id) id = preset[0]['id'] else: preset = [p for p in presets if p['id'] == id] if not preset: raise Exception('unknown preset %s' % id) preset = preset[0] i = preset['position'] if i['elevation'] < 0: i['elevation'] -= 1 i['azimuth'] += 1 print('goto preset', id, i['elevation'], i['azimuth'], i['absoluteZoom']) return self.goto(i['elevation'], i['azimuth'], i['absoluteZoom'], pan=pan, tilt=tilt, zoom=zoom, zoom_last=zoom_last) def goto(self, elevation, azimuth, absoluteZoom, speed=1, pan=None, tilt=None, zoom=None, zoom_last=False): t0 = time.time() start = self.status() target = {'elevation': elevation, 'azimuth': azimuth, 'absoluteZoom': absoluteZoom} print('start:', start, 'target:', target) if zoom_last: goto_last = { 'speed': speed, 'pan': pan, 'tilt': tilt, 'zoom': zoom, } goto_last.update(target) target['absoluteZoom'] = start['absoluteZoom'] def get_delta(): current = self.status() delta = {} for key in current: delta[key] = target[key] - current[key] return current, delta current, delta = get_delta() if pan is None: pan = speed if tilt is None: tilt = speed if zoom is None: zoom = 1 if delta['elevation'] > 0: tilt = -tilt if delta['azimuth'] < 0: pan = -pan if delta['absoluteZoom'] < 0: zoom = -zoom if pan and not delta['azimuth']: pan = 0 if tilt and not delta['elevation']: tilt = 0 if zoom and not delta['absoluteZoom']: zoom = 0 move = {'pan': pan, 'tilt': tilt, 'zoom': zoom} direction = {k: v >= 0 for k, v in delta.items()} print(move, direction) self.continuous(move) n = 0 while sum(map(abs, move.values())): n += 1 #print(current, delta, move) current, delta = get_delta() changed = False if delta and 'azimuth' in delta and 'azimuth' in direction: if pan and (not delta['azimuth'] or (delta['azimuth'] >= 0) != direction['azimuth']): pan = 0 changed = True elif delta: print('why no azimuth? delta: %s direction: %s' % (delta, direction)) ''' if pan > 0: pan = min(pan, delta['azimuth']) change = True elif pan < 0 and delta['azimuth'] < 0: pan = max(pan, delta['azimuth']) changed = True ''' if delta and 'elevation' in delta and 'elevation' in direction: if tilt and delta and (not delta['elevation'] or (delta['elevation'] >= 0) != direction['elevation']): tilt = 0 changed = True elif delta: print('why no elevation? delta: %s direction: %s' % (delta, direction)) ''' if tilt > 0: tilt = min(tilt, delta['elevation']) change = True elif tilt < 0 and delta['elevation'] < 0: tilt = max(tilt, delta['elevation']) changed = True ''' if delta and 'absoluteZoom' in delta and 'absoluteZoom' in direction: if zoom and (not delta['absoluteZoom'] or (delta['absoluteZoom'] >= 0) != direction['absoluteZoom']): zoom = 0 changed = True elif delta: print('no absoluteZoom in delta?', delta, direction) if changed or not n % 500: print('update move', move, current) move = {'pan': pan, 'tilt': tilt, 'zoom': zoom} self.continuous(self.STOP) self.continuous(move) if sum(map(abs, move.values())) and not self.abort: time.sleep(0.1) if self.abort or self.hour_changed(): self.continuous(self.STOP) return self.continuous(self.STOP) print(time.time() - t0) if zoom_last: return self.goto(**goto_last) else: return get_delta() def get_preset(self, id): presets = self.get_presets(True) if isinstance(id, str): preset = [p for p in presets if p['name'] == id] if not preset: raise Exception('unknown preset %s' % id) id = preset[0]['id'] else: preset = [p for p in presets if p['id'] == id] if not preset: raise Exception('unknown preset %s' % id) preset = preset[0] return preset def absolute(self, elevation, azimuth, absoluteZoom): cmd = { 'AbsoluteHigh': { 'elevation': elevation, 'azimuth': azimuth, 'absoluteZoom': absoluteZoom } } r = self.put('absolute', cmd) if '1' not in r: print(r) return False return True def get_presets_xml(self): return self.get('presets') def get_presets(self, details=False): system_presets = list(range(33, 47)) + list(range(90, 106)) presets = etree_to_list(xml.etree.ElementTree.fromstring(self.get('presets'))) presets = [k['PTZPreset'] for k in presets if k['PTZPreset']['id'] not in system_presets] for preset in presets: del preset['enabled'] if not details: del preset['AbsoluteHigh'] else: preset['position'] = preset.pop('AbsoluteHigh') preset['name'] = preset.pop('presetName') return presets