700 lines
23 KiB
Python
700 lines
23 KiB
Python
|
#!/usr/bin/env python
|
||
|
|
||
|
# Copyright 2019 Kent A. Vander Velden <kent.vandervelden@gmail.com>
|
||
|
#
|
||
|
# If you use this software, please consider contacting me. I'd like to hear
|
||
|
# about your work.
|
||
|
#
|
||
|
# This file is part of DMM-DYN4.
|
||
|
#
|
||
|
# DMM-Dyn4 is free software: you can redistribute it and/or modify
|
||
|
# it under the terms of the GNU General Public License as published by
|
||
|
# the Free Software Foundation, either version 3 of the License, or
|
||
|
# (at your option) any later version.
|
||
|
#
|
||
|
# DMM-DYN4 is distributed in the hope that it will be useful,
|
||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
# GNU General Public License for more details.
|
||
|
#
|
||
|
# You should have received a copy of the GNU General Public License
|
||
|
# along with DMM-DYN4. If not, see <https://www.gnu.org/licenses/>.
|
||
|
|
||
|
|
||
|
from __future__ import print_function
|
||
|
|
||
|
import sys
|
||
|
import time
|
||
|
import serial
|
||
|
|
||
|
import numpy as np
|
||
|
|
||
|
|
||
|
def sign_extend(value, bits):
|
||
|
# from: https://stackoverflow.com/a/32031543
|
||
|
sign_bit = 1 << (bits - 1)
|
||
|
return (value & (sign_bit - 1)) - (value & sign_bit)
|
||
|
|
||
|
|
||
|
class DMMException(Exception):
|
||
|
def __init__(self):
|
||
|
Exception.__init__(self)
|
||
|
|
||
|
|
||
|
class DMMTimeout(DMMException):
|
||
|
def __init__(self):
|
||
|
Exception.__init__(self)
|
||
|
|
||
|
|
||
|
class DMMExceptionUnexpectedLength(DMMException):
|
||
|
def __init__(self, n, expected_n):
|
||
|
DMMException.__init__(self)
|
||
|
self.n = n
|
||
|
self.expected_n = expected_n
|
||
|
|
||
|
|
||
|
class DMMExceptionTruncatedWrite(DMMException):
|
||
|
def __init__(self, n, expected_n):
|
||
|
DMMException.__init__(self)
|
||
|
self.n = n
|
||
|
self.expected_n = expected_n
|
||
|
|
||
|
|
||
|
class DMMExceptionUnknownFunctionID(DMMException):
|
||
|
def __init__(self, func_id):
|
||
|
DMMException.__init__(self)
|
||
|
self.func_id = func_id
|
||
|
|
||
|
|
||
|
class DMMExceptionUnknownFunctionID(DMMException):
|
||
|
def __init__(self, func_id):
|
||
|
DMMException.__init__(self)
|
||
|
self.func_id = func_id
|
||
|
|
||
|
|
||
|
class DMMExceptionUnexpectedFunc(DMMException):
|
||
|
def __init__(self):
|
||
|
DMMException.__init__(self)
|
||
|
|
||
|
|
||
|
class DMMDrive:
|
||
|
def __init__(self, serial_dev, drive_id):
|
||
|
self.serial = serial.Serial(serial_dev,
|
||
|
38400,
|
||
|
timeout=None,
|
||
|
parity=serial.PARITY_NONE,
|
||
|
stopbits=serial.STOPBITS_ONE,
|
||
|
bytesize=serial.EIGHTBITS)
|
||
|
self.drive_id = drive_id
|
||
|
|
||
|
# print(dir(self.serial))
|
||
|
|
||
|
self.flush()
|
||
|
|
||
|
# print(serial.VERSION)
|
||
|
# print(self.serial.isOpen())
|
||
|
self.debug = False
|
||
|
|
||
|
self.torque_arr = []
|
||
|
|
||
|
|
||
|
def __enter__(self):
|
||
|
return self
|
||
|
|
||
|
def __exit__(self, type, value, traceback):
|
||
|
self.serial.close()
|
||
|
|
||
|
def flush(self):
|
||
|
if serial.VERSION > '2.5':
|
||
|
self.serial.reset_input_buffer()
|
||
|
else:
|
||
|
self.serial.flushInput()
|
||
|
|
||
|
self.serial.timeout = .05
|
||
|
while len(self.serial.read(1)) > 0:
|
||
|
pass
|
||
|
|
||
|
@staticmethod
|
||
|
def verify_func_id(func_id):
|
||
|
if not (0x10 <= func_id <= 0x1b or func_id == 0x1e):
|
||
|
raise DMMExceptionUnknownFunctionID(func_id)
|
||
|
|
||
|
def general_read2(self, func_id2):
|
||
|
packet = [0x00 | self.drive_id,
|
||
|
0x80 | (0 << 5) | func_id2,
|
||
|
0x80]
|
||
|
packet += [0x80 | (sum(packet) & 0x7f)]
|
||
|
|
||
|
if self.debug:
|
||
|
print([hex(x) for x in packet])
|
||
|
|
||
|
packet = bytearray(packet)
|
||
|
n = self.serial.write(packet)
|
||
|
|
||
|
if n != len(packet):
|
||
|
raise DMMExceptionTruncatedWrite(n, len(packet))
|
||
|
|
||
|
host_fids = {
|
||
|
'Set_Origin': 0x00,
|
||
|
'Go_Absolute_Pos': 0x01,
|
||
|
'Make_LinearLine': 0x02,
|
||
|
'Go_Relative_Pos': 0x03,
|
||
|
'Make_CircularArc': 0x04,
|
||
|
'Assign_Drive_ID': 0x05,
|
||
|
'Read_Drive_ID': 0x06,
|
||
|
'Set_Drive_Config': 0x07,
|
||
|
'Read_Drive_Config': 0x08,
|
||
|
'Read_Drive_Status': 0x09,
|
||
|
'Turn_ConstSpeed': 0x0a,
|
||
|
'Square_Wave': 0x0b,
|
||
|
'Sin_Wave': 0x0c,
|
||
|
'SS_Frequency': 0x0d,
|
||
|
'General_Read': 0x0e,
|
||
|
'ForMotorDefine': 0x0f,
|
||
|
'Set_MainGain': 0x10,
|
||
|
'Set_SpeedGain': 0x11,
|
||
|
'Set_IntGain': 0x12,
|
||
|
'Set_TrqCons': 0x13,
|
||
|
'Set_HighSpeed': 0x14,
|
||
|
'Set_HighAccel': 0x15,
|
||
|
'Set_Pos_OnRange': 0x16,
|
||
|
'Set_GearNumber': 0x17,
|
||
|
'Read_MainGain': 0x18,
|
||
|
'Read_SpeedGain': 0x19,
|
||
|
'Read_IntGain': 0x1a,
|
||
|
'Read_TrqCons': 0x1b,
|
||
|
'Read_HighSpeed': 0x1c,
|
||
|
'Read_HighAccel': 0x1d,
|
||
|
'Read_Pos_OnRange': 0x1e,
|
||
|
'Read_GearNumber': 0x1f}
|
||
|
|
||
|
dyn_fids = {
|
||
|
'Is_MainGain': 0x10,
|
||
|
'Is_SpeedGain': 0x11,
|
||
|
'Is_IntGain': 0x12,
|
||
|
'Is_TrqCons': 0x13,
|
||
|
'Is_HighSpeed': 0x14,
|
||
|
'Is_HighAccel': 0x15,
|
||
|
'Is_Drive_ID': 0x16,
|
||
|
'Is_Pos_OnRange': 0x17,
|
||
|
'Is_GearNumber': 0x18,
|
||
|
'Is_Status': 0x19,
|
||
|
'Is_Config': 0x1a,
|
||
|
'Is_AbsPos32': 0x1b,
|
||
|
'Is_TrqCurrent': 0x1e}
|
||
|
|
||
|
def read_MainGain(self):
|
||
|
self.general_read2(self.host_fids['Read_MainGain'])
|
||
|
return self.check_response(self.dyn_fids['Is_MainGain'])
|
||
|
|
||
|
def read_SpeedGain(self):
|
||
|
self.general_read2(self.host_fids['Read_SpeedGain'])
|
||
|
return self.check_response(self.dyn_fids['Is_SpeedGain'])
|
||
|
|
||
|
def read_IntGain(self):
|
||
|
self.general_read2(self.host_fids['Read_IntGain'])
|
||
|
return self.check_response(self.dyn_fids['Is_IntGain'])
|
||
|
|
||
|
def read_TrqCons(self):
|
||
|
self.general_read2(self.host_fids['Read_TrqCons'])
|
||
|
return self.check_response(self.dyn_fids['Is_TrqCons'])
|
||
|
|
||
|
def read_HighSpeed(self):
|
||
|
self.general_read2(self.host_fids['Read_HighSpeed'])
|
||
|
return self.check_response(self.dyn_fids['Is_HighSpeed'])
|
||
|
|
||
|
def read_HighAccel(self):
|
||
|
self.general_read2(self.host_fids['Read_HighAccel'])
|
||
|
return self.check_response(self.dyn_fids['Is_HighAccel'])
|
||
|
|
||
|
def read_Pos_OnRange(self):
|
||
|
self.general_read2(self.host_fids['Read_Pos_OnRange'])
|
||
|
return self.check_response(self.dyn_fids['Is_Pos_OnRange'])
|
||
|
|
||
|
def read_GearNumber(self):
|
||
|
self.general_read2(self.host_fids['Read_GearNumber'])
|
||
|
return self.check_response(self.dyn_fids['Is_GearNumber'])
|
||
|
|
||
|
def read_Status(self):
|
||
|
self.general_read2(self.host_fids['Read_Drive_Status'])
|
||
|
return self.check_response(self.dyn_fids['Is_Status'])
|
||
|
|
||
|
def read_Config(self):
|
||
|
self.general_read2(self.host_fids['Read_Drive_Config'])
|
||
|
return self.check_response(self.dyn_fids['Is_Config'])
|
||
|
|
||
|
def read_AbsPos32(self):
|
||
|
self.general_read(self.dyn_fids['Is_AbsPos32'])
|
||
|
return self.check_response(self.dyn_fids['Is_AbsPos32'])
|
||
|
|
||
|
def read_TrqCurrent(self):
|
||
|
self.general_read(self.dyn_fids['Is_TrqCurrent'])
|
||
|
return self.check_response(self.dyn_fids['Is_TrqCurrent'])
|
||
|
|
||
|
def set_Config(self):
|
||
|
func_id2 = self.host_fids['Set_Drive_Config']
|
||
|
# Toggling the enable bit (b5) does not effect the drive (only tested in analog mode)
|
||
|
cnf = ((0 << 6) | # TBD
|
||
|
(1 << 5) | # b5 = 0 : let Drive servo
|
||
|
# b5 = 1 : let Drive free, motor could be turned freely
|
||
|
(0x01 << 3) | # b4 b3 = 0 : Position servo as default
|
||
|
# 1 : Speed servo
|
||
|
# 2 : Torque servo
|
||
|
# 3 : TBD
|
||
|
(0 << 2) | # b2 = 0 : works as relative mode(default) like normal optical encoder
|
||
|
# b2 = 1 : works as absolute position system, motor will back to absolute zero or POS2(Stored in
|
||
|
# sensor) automatically after power on reset.
|
||
|
(0x03 << 0)) # b1 b0 = 0 : RS232 mode
|
||
|
# 1 : CW,CCW mode
|
||
|
# 2 : Pulse/Dir or (SPI mode Optional)
|
||
|
# 3 : Anlog mode
|
||
|
packet = [0x00 | self.drive_id,
|
||
|
0x80 | (0 << 5) | func_id2,
|
||
|
0x80 | cnf]
|
||
|
|
||
|
packet += [0x80 | (sum(packet) & 0x7f)]
|
||
|
|
||
|
if self.debug:
|
||
|
print([hex(x) for x in packet])
|
||
|
|
||
|
packet = bytearray(packet)
|
||
|
n = self.serial.write(packet)
|
||
|
|
||
|
if n != len(packet):
|
||
|
raise DMMExceptionTruncatedWrite(n, len(packet))
|
||
|
|
||
|
def check_response(self, expected_func_id, max_attempts=3):
|
||
|
for i in range(max_attempts):
|
||
|
func_id, v = self.read_response()
|
||
|
if func_id == expected_func_id:
|
||
|
break
|
||
|
if i == max_attempts:
|
||
|
raise DMMExceptionUnexpectedFunc()
|
||
|
|
||
|
return v
|
||
|
|
||
|
def general_read(self, func_id):
|
||
|
self.verify_func_id(func_id)
|
||
|
|
||
|
packet = [0x00 | self.drive_id,
|
||
|
0x80 | (0 << 5) | 0x0e,
|
||
|
0x80 | (func_id & 0x7f)]
|
||
|
packet += [0x80 | (sum(packet) & 0x7f)]
|
||
|
|
||
|
if self.debug:
|
||
|
print([hex(x) for x in packet])
|
||
|
|
||
|
packet = bytearray(packet)
|
||
|
n = self.serial.write(packet)
|
||
|
|
||
|
if n != len(packet):
|
||
|
raise DMMExceptionTruncatedWrite(n, len(packet))
|
||
|
|
||
|
def read_response(self):
|
||
|
if self.debug:
|
||
|
print('read response')
|
||
|
|
||
|
self.serial.timeout = .05
|
||
|
|
||
|
arr = []
|
||
|
first_byte = False
|
||
|
expected_len = 0
|
||
|
timed_out = False
|
||
|
while True:
|
||
|
x = self.serial.read(1)
|
||
|
if len(x) == 0:
|
||
|
# timeout occured
|
||
|
timed_out = True
|
||
|
break
|
||
|
x = ord(x)
|
||
|
first_byte = (x & 0x80) == 0
|
||
|
if first_byte:
|
||
|
if self.debug:
|
||
|
print('first byte')
|
||
|
first_byte = True
|
||
|
expected_len = 0
|
||
|
arr = []
|
||
|
|
||
|
arr += [x]
|
||
|
if len(arr) == 2:
|
||
|
expected_len = 4 + ((x >> 5) & 0x03)
|
||
|
|
||
|
# x = x & 0x7f
|
||
|
if self.debug:
|
||
|
print(hex(x))
|
||
|
|
||
|
if len(arr) == expected_len:
|
||
|
break
|
||
|
|
||
|
if timed_out:
|
||
|
raise DMMTimeout()
|
||
|
|
||
|
if self.debug:
|
||
|
print(expected_len, len(arr), [hex(x) for x in arr])
|
||
|
|
||
|
def verify_length(arr, expected_n):
|
||
|
n = (arr[1] >> 5) & 0x03
|
||
|
# print(hex(arr[1]), n, expected_n)
|
||
|
if n != expected_n:
|
||
|
raise DMMExceptionUnexpectedLength(n, expected_n)
|
||
|
|
||
|
if arr and expected_len == len(arr):
|
||
|
drive_id = arr[0] & 0x7f
|
||
|
func_id = arr[1] & 0x1f
|
||
|
|
||
|
crc_check = ((sum(arr[:-1]) ^ arr[-1]) & 0x7f) == 0
|
||
|
if self.debug:
|
||
|
print(crc_check)
|
||
|
print(hex(func_id))
|
||
|
if crc_check:
|
||
|
if 0x00 <= func_id <= 0x0a or 0x1e < func_id:
|
||
|
print('Unallowed address read:', func_id)
|
||
|
elif func_id == 0x10:
|
||
|
# Is_MainGain
|
||
|
verify_length(arr, 0)
|
||
|
x = arr[2] & 0x7f
|
||
|
|
||
|
elif func_id == 0x11:
|
||
|
# Is_SpeedGain
|
||
|
verify_length(arr, 0)
|
||
|
x = arr[2] & 0x7f
|
||
|
|
||
|
elif func_id == 0x12:
|
||
|
# Is_IntGain
|
||
|
verify_length(arr, 0)
|
||
|
x = arr[2] & 0x7f
|
||
|
|
||
|
elif func_id == 0x13:
|
||
|
# Is_TrqCons
|
||
|
verify_length(arr, 0)
|
||
|
x = arr[2] & 0x7f
|
||
|
|
||
|
elif func_id == 0x14:
|
||
|
# Is_HighSpeed
|
||
|
verify_length(arr, 0)
|
||
|
x = arr[2] & 0x7f
|
||
|
|
||
|
elif func_id == 0x15:
|
||
|
# Is_HighAccel
|
||
|
verify_length(arr, 0)
|
||
|
x = arr[2] & 0x7f
|
||
|
|
||
|
elif func_id == 0x16:
|
||
|
# Is_Drive_ID
|
||
|
verify_length(arr, 0)
|
||
|
x = arr[2] & 0x7f
|
||
|
|
||
|
elif func_id == 0x17:
|
||
|
# Is_Pos_OnRange
|
||
|
verify_length(arr, 0)
|
||
|
x = arr[2] & 0x7f
|
||
|
|
||
|
elif func_id == 0x18:
|
||
|
# Is_GearNumber
|
||
|
verify_length(arr, 3)
|
||
|
# self.debug = True
|
||
|
# print([hex(x) for x in arr])
|
||
|
# x = self.read_signed_val(arr)
|
||
|
# self.debug = False
|
||
|
x = [((arr[2] & 0x7f) << 7) | (arr[3] & 0x7f),
|
||
|
((arr[4] & 0x7f) << 7) | (arr[5] & 0x7f)]
|
||
|
|
||
|
elif func_id == 0x19:
|
||
|
# Is_Status
|
||
|
verify_length(arr, 0)
|
||
|
x = arr[2] & 0x7f
|
||
|
d = {}
|
||
|
|
||
|
if x & (1 << 0):
|
||
|
d['in position'] = '0 (motor busy, or |Pset - Pmotor|> OnRange)'
|
||
|
d['in position'] = '0'
|
||
|
else:
|
||
|
d['in position'] = '1 (On position, i.e. |Pset - Pmotor| < = OnRange)'
|
||
|
d['in position'] = '1'
|
||
|
|
||
|
if x & (1 << 1):
|
||
|
d['motor'] = 'motor free'
|
||
|
else:
|
||
|
d['motor'] = 'motor servo'
|
||
|
|
||
|
x2 = (x >> 2) & 0x7
|
||
|
if x2 == 0:
|
||
|
d['alarm'] = 'No alarm'
|
||
|
d['alarm'] = ''
|
||
|
elif x2 == 1:
|
||
|
d['alarm'] = 'motor lost phase alarm, |Pset - Pmotor|>8192(steps), 180(deg)'
|
||
|
d['alarm'] = 'lost phase'
|
||
|
elif x2 == 2:
|
||
|
d['alarm'] = 'motor over current alarm'
|
||
|
d['alarm'] = 'over current'
|
||
|
elif x2 == 3:
|
||
|
d['alarm'] = 'motor overheat alarm, or motor over power'
|
||
|
d['alarm'] = 'overheat or over power'
|
||
|
elif x2 == 4:
|
||
|
d['alarm'] = 'there is error for CRC code check, refuse to accept current command'
|
||
|
d['alarm'] = 'corrupt command'
|
||
|
elif x2 > 4:
|
||
|
d['alarm'] = 'TBD'
|
||
|
|
||
|
if x & (1 << 5):
|
||
|
d['motion'] = 'busy (means built in S-curve, linear, circular motion is busy on current motion)'
|
||
|
d['motion'] = 'busy'
|
||
|
else:
|
||
|
d[
|
||
|
'motion'] = 'completed (means built in S-curve, linear, circular motion completed; waiting for next motion)'
|
||
|
d['motion'] = 'completed'
|
||
|
|
||
|
if x & (1 << 6):
|
||
|
d['pin2'] = '1 (pin2 status of JP3,used for Host PC to detect CNC zero position or others)'
|
||
|
d['pin2'] = '1'
|
||
|
else:
|
||
|
d['pin2'] = '0 (pin2 status of JP3,used for Host PC to detect CNC zero position or others)'
|
||
|
d['pin2'] = '0'
|
||
|
|
||
|
x = d
|
||
|
|
||
|
elif func_id == 0x1a:
|
||
|
# Is_Config
|
||
|
verify_length(arr, 0)
|
||
|
x = arr[2] & 0x7f
|
||
|
print(hex(x))
|
||
|
d = {}
|
||
|
|
||
|
x2 = x & 0x03
|
||
|
if x2 == 0:
|
||
|
d['input mode'] = 'RS232 mode'
|
||
|
d['input mode'] = 'RS232'
|
||
|
elif x2 == 1:
|
||
|
d['input mode'] = 'CW/CCW mode'
|
||
|
d['input mode'] = 'CW/CCW'
|
||
|
elif x2 == 2:
|
||
|
d['input mode'] = 'Pulse/Dir or (SPI mode Optional)'
|
||
|
d['input mode'] = 'pulse/dir'
|
||
|
elif x2 == 3:
|
||
|
d['input mode'] = 'Analog mode'
|
||
|
d['input mode'] = 'analog'
|
||
|
|
||
|
if x & (1 << 2):
|
||
|
d['positioning'] = 'works as absolute position system, motor will back to absolute zero or POS2(Stored in sensor) automatically after power on reset.'
|
||
|
d['positioning'] = 'absolute'
|
||
|
else:
|
||
|
d['positioning'] = 'relative mode(default) like normal optical encoder'
|
||
|
d['positioning'] = 'relative'
|
||
|
|
||
|
x2 = (x >> 3) & 0x03
|
||
|
if x2 == 0:
|
||
|
d['servo mode'] = 'Position servo as default'
|
||
|
d['servo mode'] = 'position'
|
||
|
elif x2 == 1:
|
||
|
d['servo mode'] = 'Speed servo'
|
||
|
d['servo mode'] = 'speed'
|
||
|
elif x2 == 2:
|
||
|
d['servo mode'] = 'Torque servo'
|
||
|
d['servo mode'] = 'torque'
|
||
|
elif x2 == 3:
|
||
|
d['servo mode'] = 'TBD'
|
||
|
|
||
|
if x & (1 << 5):
|
||
|
d['enabled'] = 'let drive servo'
|
||
|
d['enabled'] = 'yes'
|
||
|
else:
|
||
|
d['enabled'] = 'let Drive free, motor could be turned freely'
|
||
|
d['enabled'] = 'no'
|
||
|
|
||
|
if x & (1 << 6):
|
||
|
d['b6'] = '1 TBD'
|
||
|
else:
|
||
|
d['b6'] = '0 TBD'
|
||
|
|
||
|
x = d
|
||
|
|
||
|
elif func_id == 0x1b:
|
||
|
# Is_AbsPos32
|
||
|
x = self.read_signed_val(arr)
|
||
|
|
||
|
elif func_id in [0x1c, 0x1d]:
|
||
|
print('Unknown address read:', func_id)
|
||
|
|
||
|
elif func_id == 0x1e:
|
||
|
# Is_TrqCurrent
|
||
|
x = self.read_signed_val(arr)
|
||
|
|
||
|
return func_id, x
|
||
|
|
||
|
def read_signed_val(self, arr):
|
||
|
arr2 = arr[2:-1]
|
||
|
|
||
|
x = sign_extend(arr2[0] << 1, 8) >> 1
|
||
|
if self.debug:
|
||
|
print('first byte', [hex(arr2[0]), hex(x), x])
|
||
|
|
||
|
for y in arr2[1:]:
|
||
|
x = (x << 7) + (y & 0x7f)
|
||
|
|
||
|
if self.debug:
|
||
|
print(x, [hex(x_) for x_ in arr2])
|
||
|
|
||
|
return x
|
||
|
|
||
|
def measure_speed(self, integration_time=.1):
|
||
|
p1 = self.read_AbsPos32()
|
||
|
t1 = time.time()
|
||
|
time.sleep(integration_time)
|
||
|
p2 = self.read_AbsPos32()
|
||
|
t2 = time.time()
|
||
|
encoder_ppr = 65536.
|
||
|
rpm = (p2 - p1) / (t2 - t1) * 60. / encoder_ppr
|
||
|
return rpm
|
||
|
|
||
|
def set_speed(self, rpm):
|
||
|
packet = [0x00 | self.drive_id,
|
||
|
0x80 | (3 << 5) | 0x0a,
|
||
|
0x80 | ((rpm >> (7 * 3)) & 0x7f),
|
||
|
0x80 | ((rpm >> (7 * 2)) & 0x7f),
|
||
|
0x80 | ((rpm >> (7 * 1)) & 0x7f),
|
||
|
0x80 | ((rpm >> (7 * 0)) & 0x7f)]
|
||
|
packet += [0x80 | (sum(packet) & 0x7f)]
|
||
|
|
||
|
if self.debug:
|
||
|
print([hex(x) for x in packet])
|
||
|
|
||
|
packet = bytearray(packet)
|
||
|
n = self.serial.write(packet)
|
||
|
|
||
|
if n != len(packet):
|
||
|
raise DMMExceptionTruncatedWrite(n, len(packet))
|
||
|
|
||
|
def integrate_TrqCurrent(self, max_dt=1.):
|
||
|
st = time.time()
|
||
|
dt = 0.
|
||
|
arr = []
|
||
|
while dt < max_dt:
|
||
|
arr += [self.read_TrqCurrent()]
|
||
|
dt = time.time() - st
|
||
|
|
||
|
# = [dmm.read_TrqCurrent() for _ in range(100)]
|
||
|
# print(arr)
|
||
|
d = {'min': np.min(arr), 'max': np.max(arr), 'mean': np.mean(arr), 'median': np.median(arr),
|
||
|
'stddev': np.std(arr)}
|
||
|
print(d)
|
||
|
|
||
|
arr = np.abs(arr)
|
||
|
d = {'min': np.min(arr), 'max': np.max(arr), 'mean': np.mean(arr), 'median': np.median(arr),
|
||
|
'stddev': np.std(arr)}
|
||
|
print(d)
|
||
|
|
||
|
def update_TrqCurrent(self, max_dt=1.):
|
||
|
st = time.time()
|
||
|
dt = 0.
|
||
|
st10 = time.time()
|
||
|
self.torque_arr += [(st, self.read_TrqCurrent())]
|
||
|
st11 = time.time()
|
||
|
print('round trip:', st11 - st10)
|
||
|
dt = time.time() - st
|
||
|
|
||
|
self.torque_arr = [v for v in self.torque_arr if (st - v[0]) <= max_dt]
|
||
|
print('len(arr):', len(self.torque_arr))
|
||
|
|
||
|
# = [dmm.read_TrqCurrent() for _ in range(100)]
|
||
|
# print(self.torque_arr)
|
||
|
arr = [v[1] for v in self.torque_arr]
|
||
|
d = {'min': np.min(arr), 'max': np.max(arr), 'mean': np.mean(arr), 'median': np.median(arr),
|
||
|
'stddev': np.std(arr)}
|
||
|
print(d)
|
||
|
|
||
|
arr = np.abs(arr)
|
||
|
d = {'min': np.min(arr), 'max': np.max(arr), 'mean': np.mean(arr), 'median': np.median(arr),
|
||
|
'stddev': np.std(arr)}
|
||
|
print(d)
|
||
|
|
||
|
arr = [v[1] for v in self.torque_arr]
|
||
|
return np.mean(arr), arr[-1]
|
||
|
|
||
|
|
||
|
def find_device():
|
||
|
devs = []
|
||
|
|
||
|
return '/dev/ttyUSB0'
|
||
|
|
||
|
global serial
|
||
|
if serial.VERSION > '2.5':
|
||
|
import serial.tools.list_ports
|
||
|
|
||
|
for dev in serial.tools.list_ports.comports():
|
||
|
if dev.description == 'FT230X Basic UART' and dev.manufacturer == 'FTDI' and dev.product == 'FT230X Basic UART':
|
||
|
devs += [dev.device]
|
||
|
else:
|
||
|
# LinuxCNC 2.7.15 is still tied to Python2
|
||
|
import glob
|
||
|
devs = glob.glob('/dev/ttyUSB*')
|
||
|
|
||
|
if not devs:
|
||
|
print('No known serial devices found.')
|
||
|
return ''
|
||
|
|
||
|
if len(devs) > 1:
|
||
|
print('More than one serial devices found...')
|
||
|
for dev in devs:
|
||
|
print(dev)
|
||
|
print('Selecting first serial device.')
|
||
|
|
||
|
print('Using device at:', devs[0])
|
||
|
return devs[0]
|
||
|
|
||
|
|
||
|
def serial_loop(dev_fn):
|
||
|
with DMMDrive(dev_fn, 0) as dmm:
|
||
|
d = {}
|
||
|
d['Status'] = dmm.read_Status()
|
||
|
|
||
|
d['MainGain'] = dmm.read_MainGain()
|
||
|
d['SpeedGain'] = dmm.read_SpeedGain()
|
||
|
d['IntGain'] = dmm.read_IntGain()
|
||
|
d['TrqCons'] = dmm.read_TrqCons()
|
||
|
d['HighSpeed'] = dmm.read_HighSpeed()
|
||
|
d['HighAccel'] = dmm.read_HighAccel()
|
||
|
d['Pos_OnRange'] = dmm.read_Pos_OnRange()
|
||
|
d['GearNumber'] = dmm.read_GearNumber()
|
||
|
d['Status'] = dmm.read_Status()
|
||
|
d['Config'] = dmm.read_Config()
|
||
|
d['AbsPos32'] = dmm.read_AbsPos32()
|
||
|
d['TrqCurrent'] = dmm.read_TrqCurrent()
|
||
|
|
||
|
if True:
|
||
|
dmm.set_speed(-30)
|
||
|
|
||
|
time.sleep(.5)
|
||
|
d['Speed'] = dmm.measure_speed()
|
||
|
|
||
|
print(d)
|
||
|
|
||
|
while True:
|
||
|
time.sleep(.5)
|
||
|
print(dmm.measure_speed())
|
||
|
|
||
|
dmm.integrate_TrqCurrent()
|
||
|
|
||
|
|
||
|
def main():
|
||
|
try:
|
||
|
while True:
|
||
|
dev_fn = find_device()
|
||
|
if dev_fn:
|
||
|
try:
|
||
|
serial_loop(dev_fn)
|
||
|
except DMMTimeout:
|
||
|
print('Timedout')
|
||
|
# raise
|
||
|
except serial.serialutil.SerialException as e:
|
||
|
# SerialException: could not open port /dev/ttyUSB1: [Errno 2] No such file or directory: '/dev/ttyUSB1'
|
||
|
# SerialException: device reports readiness to read but returned no data (device disconnected?)
|
||
|
print('SerialException:', e)
|
||
|
print('Kicked out... resting before retrying.')
|
||
|
time.sleep(1)
|
||
|
except KeyboardInterrupt:
|
||
|
pass
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|