#!/usr/bin/env python # Copyright 2019 Kent A. Vander Velden # # If you use this software, please consider contacting me. I'd like to hear # about your work. # # This file is part of DMM-DYN4. # # DMM-Dyn4 is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # DMM-DYN4 is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with DMM-DYN4. If not, see . from __future__ import print_function import sys import time import serial import numpy as np def sign_extend(value, bits): # from: https://stackoverflow.com/a/32031543 sign_bit = 1 << (bits - 1) return (value & (sign_bit - 1)) - (value & sign_bit) class DMMException(Exception): def __init__(self): Exception.__init__(self) class DMMTimeout(DMMException): def __init__(self): Exception.__init__(self) class DMMExceptionUnexpectedLength(DMMException): def __init__(self, n, expected_n): DMMException.__init__(self) self.n = n self.expected_n = expected_n class DMMExceptionTruncatedWrite(DMMException): def __init__(self, n, expected_n): DMMException.__init__(self) self.n = n self.expected_n = expected_n class DMMExceptionUnknownFunctionID(DMMException): def __init__(self, func_id): DMMException.__init__(self) self.func_id = func_id class DMMExceptionUnknownFunctionID(DMMException): def __init__(self, func_id): DMMException.__init__(self) self.func_id = func_id class DMMExceptionUnexpectedFunc(DMMException): def __init__(self): DMMException.__init__(self) class DMMDrive: def __init__(self, serial_dev, drive_id): self.serial = serial.Serial(serial_dev, 38400, timeout=None, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS) self.drive_id = drive_id # print(dir(self.serial)) self.flush() # print(serial.VERSION) # print(self.serial.isOpen()) self.debug = False self.torque_arr = [] def __enter__(self): return self def __exit__(self, type, value, traceback): self.serial.close() def flush(self): if serial.VERSION > '2.5': self.serial.reset_input_buffer() else: self.serial.flushInput() self.serial.timeout = .05 while len(self.serial.read(1)) > 0: pass @staticmethod def verify_func_id(func_id): if not (0x10 <= func_id <= 0x1b or func_id == 0x1e): raise DMMExceptionUnknownFunctionID(func_id) def general_read2(self, func_id2): packet = [0x00 | self.drive_id, 0x80 | (0 << 5) | func_id2, 0x80] packet += [0x80 | (sum(packet) & 0x7f)] if self.debug: print([hex(x) for x in packet]) packet = bytearray(packet) n = self.serial.write(packet) if n != len(packet): raise DMMExceptionTruncatedWrite(n, len(packet)) host_fids = { 'Set_Origin': 0x00, 'Go_Absolute_Pos': 0x01, 'Make_LinearLine': 0x02, 'Go_Relative_Pos': 0x03, 'Make_CircularArc': 0x04, 'Assign_Drive_ID': 0x05, 'Read_Drive_ID': 0x06, 'Set_Drive_Config': 0x07, 'Read_Drive_Config': 0x08, 'Read_Drive_Status': 0x09, 'Turn_ConstSpeed': 0x0a, 'Square_Wave': 0x0b, 'Sin_Wave': 0x0c, 'SS_Frequency': 0x0d, 'General_Read': 0x0e, 'ForMotorDefine': 0x0f, 'Set_MainGain': 0x10, 'Set_SpeedGain': 0x11, 'Set_IntGain': 0x12, 'Set_TrqCons': 0x13, 'Set_HighSpeed': 0x14, 'Set_HighAccel': 0x15, 'Set_Pos_OnRange': 0x16, 'Set_GearNumber': 0x17, 'Read_MainGain': 0x18, 'Read_SpeedGain': 0x19, 'Read_IntGain': 0x1a, 'Read_TrqCons': 0x1b, 'Read_HighSpeed': 0x1c, 'Read_HighAccel': 0x1d, 'Read_Pos_OnRange': 0x1e, 'Read_GearNumber': 0x1f} dyn_fids = { 'Is_MainGain': 0x10, 'Is_SpeedGain': 0x11, 'Is_IntGain': 0x12, 'Is_TrqCons': 0x13, 'Is_HighSpeed': 0x14, 'Is_HighAccel': 0x15, 'Is_Drive_ID': 0x16, 'Is_Pos_OnRange': 0x17, 'Is_GearNumber': 0x18, 'Is_Status': 0x19, 'Is_Config': 0x1a, 'Is_AbsPos32': 0x1b, 'Is_TrqCurrent': 0x1e} def read_MainGain(self): self.general_read2(self.host_fids['Read_MainGain']) return self.check_response(self.dyn_fids['Is_MainGain']) def read_SpeedGain(self): self.general_read2(self.host_fids['Read_SpeedGain']) return self.check_response(self.dyn_fids['Is_SpeedGain']) def read_IntGain(self): self.general_read2(self.host_fids['Read_IntGain']) return self.check_response(self.dyn_fids['Is_IntGain']) def read_TrqCons(self): self.general_read2(self.host_fids['Read_TrqCons']) return self.check_response(self.dyn_fids['Is_TrqCons']) def read_HighSpeed(self): self.general_read2(self.host_fids['Read_HighSpeed']) return self.check_response(self.dyn_fids['Is_HighSpeed']) def read_HighAccel(self): self.general_read2(self.host_fids['Read_HighAccel']) return self.check_response(self.dyn_fids['Is_HighAccel']) def read_Pos_OnRange(self): self.general_read2(self.host_fids['Read_Pos_OnRange']) return self.check_response(self.dyn_fids['Is_Pos_OnRange']) def read_GearNumber(self): self.general_read2(self.host_fids['Read_GearNumber']) return self.check_response(self.dyn_fids['Is_GearNumber']) def read_Status(self): self.general_read2(self.host_fids['Read_Drive_Status']) return self.check_response(self.dyn_fids['Is_Status']) def read_Config(self): self.general_read2(self.host_fids['Read_Drive_Config']) return self.check_response(self.dyn_fids['Is_Config']) def read_AbsPos32(self): self.general_read(self.dyn_fids['Is_AbsPos32']) return self.check_response(self.dyn_fids['Is_AbsPos32']) def read_TrqCurrent(self): self.general_read(self.dyn_fids['Is_TrqCurrent']) return self.check_response(self.dyn_fids['Is_TrqCurrent']) def set_Config(self): func_id2 = self.host_fids['Set_Drive_Config'] # Toggling the enable bit (b5) does not effect the drive (only tested in analog mode) cnf = ((0 << 6) | # TBD (1 << 5) | # b5 = 0 : let Drive servo # b5 = 1 : let Drive free, motor could be turned freely (0x01 << 3) | # b4 b3 = 0 : Position servo as default # 1 : Speed servo # 2 : Torque servo # 3 : TBD (0 << 2) | # b2 = 0 : works as relative mode(default) like normal optical encoder # b2 = 1 : works as absolute position system, motor will back to absolute zero or POS2(Stored in # sensor) automatically after power on reset. (0x03 << 0)) # b1 b0 = 0 : RS232 mode # 1 : CW,CCW mode # 2 : Pulse/Dir or (SPI mode Optional) # 3 : Anlog mode packet = [0x00 | self.drive_id, 0x80 | (0 << 5) | func_id2, 0x80 | cnf] packet += [0x80 | (sum(packet) & 0x7f)] if self.debug: print([hex(x) for x in packet]) packet = bytearray(packet) n = self.serial.write(packet) if n != len(packet): raise DMMExceptionTruncatedWrite(n, len(packet)) def check_response(self, expected_func_id, max_attempts=3): for i in range(max_attempts): func_id, v = self.read_response() if func_id == expected_func_id: break if i == max_attempts: raise DMMExceptionUnexpectedFunc() return v def general_read(self, func_id): self.verify_func_id(func_id) packet = [0x00 | self.drive_id, 0x80 | (0 << 5) | 0x0e, 0x80 | (func_id & 0x7f)] packet += [0x80 | (sum(packet) & 0x7f)] if self.debug: print([hex(x) for x in packet]) packet = bytearray(packet) n = self.serial.write(packet) if n != len(packet): raise DMMExceptionTruncatedWrite(n, len(packet)) def read_response(self): if self.debug: print('read response') self.serial.timeout = .05 arr = [] first_byte = False expected_len = 0 timed_out = False while True: x = self.serial.read(1) if len(x) == 0: # timeout occured timed_out = True break x = ord(x) first_byte = (x & 0x80) == 0 if first_byte: if self.debug: print('first byte') first_byte = True expected_len = 0 arr = [] arr += [x] if len(arr) == 2: expected_len = 4 + ((x >> 5) & 0x03) # x = x & 0x7f if self.debug: print(hex(x)) if len(arr) == expected_len: break if timed_out: raise DMMTimeout() if self.debug: print(expected_len, len(arr), [hex(x) for x in arr]) def verify_length(arr, expected_n): n = (arr[1] >> 5) & 0x03 # print(hex(arr[1]), n, expected_n) if n != expected_n: raise DMMExceptionUnexpectedLength(n, expected_n) if arr and expected_len == len(arr): drive_id = arr[0] & 0x7f func_id = arr[1] & 0x1f crc_check = ((sum(arr[:-1]) ^ arr[-1]) & 0x7f) == 0 if self.debug: print(crc_check) print(hex(func_id)) if crc_check: if 0x00 <= func_id <= 0x0a or 0x1e < func_id: print('Unallowed address read:', func_id) elif func_id == 0x10: # Is_MainGain verify_length(arr, 0) x = arr[2] & 0x7f elif func_id == 0x11: # Is_SpeedGain verify_length(arr, 0) x = arr[2] & 0x7f elif func_id == 0x12: # Is_IntGain verify_length(arr, 0) x = arr[2] & 0x7f elif func_id == 0x13: # Is_TrqCons verify_length(arr, 0) x = arr[2] & 0x7f elif func_id == 0x14: # Is_HighSpeed verify_length(arr, 0) x = arr[2] & 0x7f elif func_id == 0x15: # Is_HighAccel verify_length(arr, 0) x = arr[2] & 0x7f elif func_id == 0x16: # Is_Drive_ID verify_length(arr, 0) x = arr[2] & 0x7f elif func_id == 0x17: # Is_Pos_OnRange verify_length(arr, 0) x = arr[2] & 0x7f elif func_id == 0x18: # Is_GearNumber verify_length(arr, 3) # self.debug = True # print([hex(x) for x in arr]) # x = self.read_signed_val(arr) # self.debug = False x = [((arr[2] & 0x7f) << 7) | (arr[3] & 0x7f), ((arr[4] & 0x7f) << 7) | (arr[5] & 0x7f)] elif func_id == 0x19: # Is_Status verify_length(arr, 0) x = arr[2] & 0x7f d = {} if x & (1 << 0): d['in position'] = '0 (motor busy, or |Pset - Pmotor|> OnRange)' d['in position'] = '0' else: d['in position'] = '1 (On position, i.e. |Pset - Pmotor| < = OnRange)' d['in position'] = '1' if x & (1 << 1): d['motor'] = 'motor free' else: d['motor'] = 'motor servo' x2 = (x >> 2) & 0x7 if x2 == 0: d['alarm'] = 'No alarm' d['alarm'] = '' elif x2 == 1: d['alarm'] = 'motor lost phase alarm, |Pset - Pmotor|>8192(steps), 180(deg)' d['alarm'] = 'lost phase' elif x2 == 2: d['alarm'] = 'motor over current alarm' d['alarm'] = 'over current' elif x2 == 3: d['alarm'] = 'motor overheat alarm, or motor over power' d['alarm'] = 'overheat or over power' elif x2 == 4: d['alarm'] = 'there is error for CRC code check, refuse to accept current command' d['alarm'] = 'corrupt command' elif x2 > 4: d['alarm'] = 'TBD' if x & (1 << 5): d['motion'] = 'busy (means built in S-curve, linear, circular motion is busy on current motion)' d['motion'] = 'busy' else: d[ 'motion'] = 'completed (means built in S-curve, linear, circular motion completed; waiting for next motion)' d['motion'] = 'completed' if x & (1 << 6): d['pin2'] = '1 (pin2 status of JP3,used for Host PC to detect CNC zero position or others)' d['pin2'] = '1' else: d['pin2'] = '0 (pin2 status of JP3,used for Host PC to detect CNC zero position or others)' d['pin2'] = '0' x = d elif func_id == 0x1a: # Is_Config verify_length(arr, 0) x = arr[2] & 0x7f print(hex(x)) d = {} x2 = x & 0x03 if x2 == 0: d['input mode'] = 'RS232 mode' d['input mode'] = 'RS232' elif x2 == 1: d['input mode'] = 'CW/CCW mode' d['input mode'] = 'CW/CCW' elif x2 == 2: d['input mode'] = 'Pulse/Dir or (SPI mode Optional)' d['input mode'] = 'pulse/dir' elif x2 == 3: d['input mode'] = 'Analog mode' d['input mode'] = 'analog' if x & (1 << 2): d['positioning'] = 'works as absolute position system, motor will back to absolute zero or POS2(Stored in sensor) automatically after power on reset.' d['positioning'] = 'absolute' else: d['positioning'] = 'relative mode(default) like normal optical encoder' d['positioning'] = 'relative' x2 = (x >> 3) & 0x03 if x2 == 0: d['servo mode'] = 'Position servo as default' d['servo mode'] = 'position' elif x2 == 1: d['servo mode'] = 'Speed servo' d['servo mode'] = 'speed' elif x2 == 2: d['servo mode'] = 'Torque servo' d['servo mode'] = 'torque' elif x2 == 3: d['servo mode'] = 'TBD' if x & (1 << 5): d['enabled'] = 'let drive servo' d['enabled'] = 'yes' else: d['enabled'] = 'let Drive free, motor could be turned freely' d['enabled'] = 'no' if x & (1 << 6): d['b6'] = '1 TBD' else: d['b6'] = '0 TBD' x = d elif func_id == 0x1b: # Is_AbsPos32 x = self.read_signed_val(arr) elif func_id in [0x1c, 0x1d]: print('Unknown address read:', func_id) elif func_id == 0x1e: # Is_TrqCurrent x = self.read_signed_val(arr) return func_id, x def read_signed_val(self, arr): arr2 = arr[2:-1] x = sign_extend(arr2[0] << 1, 8) >> 1 if self.debug: print('first byte', [hex(arr2[0]), hex(x), x]) for y in arr2[1:]: x = (x << 7) + (y & 0x7f) if self.debug: print(x, [hex(x_) for x_ in arr2]) return x def measure_speed(self, integration_time=.1): p1 = self.read_AbsPos32() t1 = time.time() time.sleep(integration_time) p2 = self.read_AbsPos32() t2 = time.time() encoder_ppr = 65536. rpm = (p2 - p1) / (t2 - t1) * 60. / encoder_ppr return rpm def set_speed(self, rpm): packet = [0x00 | self.drive_id, 0x80 | (3 << 5) | 0x0a, 0x80 | ((rpm >> (7 * 3)) & 0x7f), 0x80 | ((rpm >> (7 * 2)) & 0x7f), 0x80 | ((rpm >> (7 * 1)) & 0x7f), 0x80 | ((rpm >> (7 * 0)) & 0x7f)] packet += [0x80 | (sum(packet) & 0x7f)] if self.debug: print([hex(x) for x in packet]) packet = bytearray(packet) n = self.serial.write(packet) if n != len(packet): raise DMMExceptionTruncatedWrite(n, len(packet)) def integrate_TrqCurrent(self, max_dt=1.): st = time.time() dt = 0. arr = [] while dt < max_dt: arr += [self.read_TrqCurrent()] dt = time.time() - st # = [dmm.read_TrqCurrent() for _ in range(100)] # print(arr) d = {'min': np.min(arr), 'max': np.max(arr), 'mean': np.mean(arr), 'median': np.median(arr), 'stddev': np.std(arr)} print(d) arr = np.abs(arr) d = {'min': np.min(arr), 'max': np.max(arr), 'mean': np.mean(arr), 'median': np.median(arr), 'stddev': np.std(arr)} print(d) def update_TrqCurrent(self, max_dt=1.): st = time.time() dt = 0. st10 = time.time() self.torque_arr += [(st, self.read_TrqCurrent())] st11 = time.time() print('round trip:', st11 - st10) dt = time.time() - st self.torque_arr = [v for v in self.torque_arr if (st - v[0]) <= max_dt] print('len(arr):', len(self.torque_arr)) # = [dmm.read_TrqCurrent() for _ in range(100)] # print(self.torque_arr) arr = [v[1] for v in self.torque_arr] d = {'min': np.min(arr), 'max': np.max(arr), 'mean': np.mean(arr), 'median': np.median(arr), 'stddev': np.std(arr)} print(d) arr = np.abs(arr) d = {'min': np.min(arr), 'max': np.max(arr), 'mean': np.mean(arr), 'median': np.median(arr), 'stddev': np.std(arr)} print(d) arr = [v[1] for v in self.torque_arr] return np.mean(arr), arr[-1] def find_device(): devs = [] return '/dev/ttyUSB0' global serial if serial.VERSION > '2.5': import serial.tools.list_ports for dev in serial.tools.list_ports.comports(): if dev.description == 'FT230X Basic UART' and dev.manufacturer == 'FTDI' and dev.product == 'FT230X Basic UART': devs += [dev.device] else: # LinuxCNC 2.7.15 is still tied to Python2 import glob devs = glob.glob('/dev/ttyUSB*') if not devs: print('No known serial devices found.') return '' if len(devs) > 1: print('More than one serial devices found...') for dev in devs: print(dev) print('Selecting first serial device.') print('Using device at:', devs[0]) return devs[0] def serial_loop(dev_fn): with DMMDrive(dev_fn, 0) as dmm: d = {} d['Status'] = dmm.read_Status() d['MainGain'] = dmm.read_MainGain() d['SpeedGain'] = dmm.read_SpeedGain() d['IntGain'] = dmm.read_IntGain() d['TrqCons'] = dmm.read_TrqCons() d['HighSpeed'] = dmm.read_HighSpeed() d['HighAccel'] = dmm.read_HighAccel() d['Pos_OnRange'] = dmm.read_Pos_OnRange() d['GearNumber'] = dmm.read_GearNumber() d['Status'] = dmm.read_Status() d['Config'] = dmm.read_Config() d['AbsPos32'] = dmm.read_AbsPos32() d['TrqCurrent'] = dmm.read_TrqCurrent() if True: dmm.set_speed(-30) time.sleep(.5) d['Speed'] = dmm.measure_speed() print(d) while True: time.sleep(.5) print(dmm.measure_speed()) dmm.integrate_TrqCurrent() def main(): try: while True: dev_fn = find_device() if dev_fn: try: serial_loop(dev_fn) except DMMTimeout: print('Timedout') # raise except serial.serialutil.SerialException as e: # SerialException: could not open port /dev/ttyUSB1: [Errno 2] No such file or directory: '/dev/ttyUSB1' # SerialException: device reports readiness to read but returned no data (device disconnected?) print('SerialException:', e) print('Kicked out... resting before retrying.') time.sleep(1) except KeyboardInterrupt: pass if __name__ == "__main__": main()