Examples
CPX-AP
startup
"""Example code for CPX-AP startup and system information"""
# import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp
# for cpx_ap, the attached modules are found automatically
with CpxAp(ip_address="192.168.1.1") as myCPX:
# view the system documentation by going to the apdd path on your system directory
# this documentation is updated everytime the CpxAp Object with this ip address is
# instanciated. It gives an overview of all parameters and functions of each module
print(myCPX.docu_path)
# alternatively you can print out the system information in the console
myCPX.print_system_information()
# to get an overview of all available parameters, there is a function that iterates
# over every module and reads out the parameters and channels if available
myCPX.print_system_state()
parameter read/write
"""Example code for CPX-AP parameter read and write"""
# import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp
# for cpx_ap, the attached modules are found automatically
with CpxAp(ip_address="192.168.1.1") as myCPX:
# Read the automatically generated documentation on your system folder
# It gives an overview of all parameters and functions of each module
print(myCPX.docu_path)
print("--------------------\n")
# to get an overview of all available parameters, there is a function that iterates
# over every module and reads out the parameters and channels if available
myCPX.print_system_state()
print("--------------------\n")
# to read or write an individual parameter, you can either call it by ID or name,
# ID is a bit faster. You can get both ID and name from the print_system_state function
# or from the documentation or print the available parameters from each module. If the
# value has enums as value, it will return the enum strings instead of the integer.
module = myCPX.modules[1]
for parameter in module.module_dicts.parameters.values():
# if you want to read it and print the correct unit of the value, you can get the unit
# from the parameter. If you want to get the (if available) enum string, use
# read_module_parameter_enum_str() instead.
print(
parameter.name,
module.read_module_parameter(parameter.parameter_id),
parameter.unit,
)
print("--------------------\n")
# writing a parameter is almost the same, you can check if the parameter is writable by
# the R/W tag or get it from the parameter. You should also check the datatype of the
# parameter to reduce unwanted behaviours. If the parameter allows enums as value you can
# either use the enum strings (you can get them by printing them from the parameter)
# try this example with a cpx-ap digital input module on index 1 or adapt VALUE to the
# correct value for your module parameter
module = myCPX.modules[1]
PARAM_VALUE = "3ms"
PARAM_ID = 20014
for parameter in module.module_dicts.parameters.values():
print(
parameter.parameter_id,
parameter.name,
"\tIs writeable: ",
parameter.is_writable,
"\tData type: ",
parameter.data_type,
"\tEnums: ",
parameter.enums,
)
# adapt this to your found parameter
module.write_module_parameter(PARAM_ID, PARAM_VALUE)
digital input
"""Example code for CPX-AP digital input"""
# import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp
# for cpx_ap, the attached modules are found automatically
with CpxAp(ip_address="192.168.1.1") as myCPX:
# Read the automatically generated documentation on your system folder
# It gives an overview of all parameters and functions of each module
print(myCPX.docu_path)
# module index 0 is 'CPX-AP-*-EP' (* can be I or A)
# The first IO module (index 1) is 'CPX-AP-I-8DI-M8-3P'
# read digital input on channel 0
# access by index
myCPX.modules[1].read_channel(0)
# or access by automatically generated name, you can see the default names
# in the documentation but you can rename the modules anytime you like
myCPX.modules[1].name = "cpxap8di"
myCPX.cpxap8di.read_channel(0)
# you can also assign it to a custom object
myIO = myCPX.modules[1]
myIO.read_channel(0)
# you can read all channels at once, this returns a list of bool
myIO.read_channels()
# configure the module
# you can read/write the parameters from the module. Check the parameter_read_write example
# for detailed information. The Input Debounce Time parameter will return an enum string.
debounce_time = myIO.read_module_parameter_enum_str("Input Debounce Time")
digital output
"""Example code for CPX-AP digital output"""
# import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp
# for cpx_ap, the attached modules are found automatically
with CpxAp(ip_address="192.168.1.1") as myCPX:
# Read the automatically generated documentation on your system folder
# It gives an overview of all parameters and functions of each module
print(myCPX.docu_path)
# module index 0 is 'CPX-AP-*-EP' (* can be I or A)
# in this example the first IO module (index 1) is 'CPX-AP-*-4DI4DO-M12-5P'
# same as with every module, you can access it by the module index or the automatically
# generated name (get it from documentation) as well as rename it (see digital_input example)
dido = myCPX.modules[1]
# keep in mind that the outputs will switch on and off as fast as your pc allows. It is very
# likely, that you will not see any change on the outputs before the code runs through and
# the connection is closed. If you actually want to see something in this example, I suggest
# using the sleep function from the time module to wait. Keep in mind that this will only work
# if you disable the modbus timeout by passing timeout=0 in the CpxAp constructor.
# set, reset, toggle a digital output
dido.set_channel(0) # sets one channel, returns none
dido.reset_channel(0) # reset one channel, returns none
dido.toggle_channel(0) # toggle the state of one channel, returns none
# sets all channel to list values [0,1,2,3] and returns none
dido.write_channels([True, True, False, False])
# read back the values of all channels. Consists of 4 input channels and 4 output channels
dido.read_channels()
# reads back the first input channel
dido.read_channel(0)
# reads back the first output channel, same as "read_channel(4)"
dido.read_output_channel(0)
# configure the module. Check what parameters are available in the documentation or read
# them from the module (see parameter_read_write example)
for parameter in dido.module_dicts.parameters.values():
print(parameter)
# sets debounce time to 10ms
dido.write_module_parameter("Input Debounce Time", "10ms")
# sets Load supply monitoring inactive, you can access also by ID (is a bit faster than name)
dido.write_module_parameter(20022, "Load supply monitoring inactive")
# hold last state on the outputs, you can also set it by the integer value instead of the enum
dido.write_module_parameter(20052, 1)
simple diagnosis
"""Example code for CPX-AP startup and system information"""
# import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp
# for cpx_ap, the attached modules are found automatically
with CpxAp(ip_address="192.168.1.1") as myCPX:
# iterate over each module and print out the available diagnosis object
for index, module in enumerate(myCPX.modules):
diagnosis = module.read_diagnosis_information()
# don't print if diagnosis == None
if diagnosis:
print(index, diagnosis)
io link with sdas
"""Example code for CPX-AP io-link"""
# import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp
# for CpxAp, the attached modules are found automatically
with CpxAp(ip_address="192.168.1.1") as myCPX:
# Read the automatically generated documentation on your system folder
# It gives an overview of all parameters and functions of each module
print(myCPX.docu_path)
SDAS_CHANNEL = 0
io_link_master = myCPX.modules[4]
# print available parameters or look at documentation
for parameter in io_link_master.module_dicts.parameters.values():
print(parameter)
# set operating mode of channel 0 to "IO-Link communication"
io_link_master.write_module_parameter("Port Mode", "IOL_AUTOSTART", SDAS_CHANNEL)
# read back the port status information to check if it's "OPERATE"
# wait for it to become "OPERATE"
param = io_link_master.read_fieldbus_parameters()
while param[SDAS_CHANNEL]["Port status information"] != "OPERATE":
param = io_link_master.read_fieldbus_parameters()
# read the channel 0 process data. This takes the device information and returns
# only relevant bytes (2)
sdas_data = io_link_master.read_channel(SDAS_CHANNEL)
# the process data consists of 4 ssc bits and 12 bit pdv (see datasheet sdas)
# you can convert the bytes data to integer first and then do bitwise operations
process_data_int = int.from_bytes(sdas_data, byteorder="big")
ssc1 = bool(process_data_int & 0x1)
ssc2 = bool(process_data_int & 0x2)
ssc3 = bool(process_data_int & 0x4)
ssc4 = bool(process_data_int & 0x8)
pdv = (process_data_int & 0xFFF0) >> 4
io link isdu read/write with sdas
"""Example code for CPX-AP io-link isdu access"""
# import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp
# for CpxAp, the attached modules are found automatically
with CpxAp(ip_address="192.168.1.1") as myCPX:
# Read the automatically generated documentation on your system folder
# It gives an overview of all parameters and functions of each module
print(myCPX.docu_path)
SDAS_CHANNEL = 0
io_link_master = myCPX.modules[4]
# setup the master for communication (see example_cpxap_iolink_sdas)
io_link_master.write_module_parameter("Port Mode", "IOL_AUTOSTART", SDAS_CHANNEL)
param = io_link_master.read_fieldbus_parameters()
while param[SDAS_CHANNEL]["Port status information"] != "OPERATE":
param = io_link_master.read_fieldbus_parameters()
# Read raw isdu value into a single channel as parameter
ret = io_link_master.read_isdu(SDAS_CHANNEL, 0x0010) # subindex is optional
# without specifying the data_type, this returns a list with a bytes object. Since in
# io-Link strings are encoded "msb first", no change is required in decoding.
# For example:
print(ret.decode("ascii"))
# Read isdu with data_type defined with channel in a list
ret = io_link_master.read_isdu([SDAS_CHANNEL], 0x0010, data_type="str")
# If you know the expected datatype, you can specify it and get the isdu data
# interpreted correctly. ret will now be a python str type.
print(ret)
# Write isdu raw (bytes) into a single channel as parameter
io_link_master.write_isdu(b"FESTO", SDAS_CHANNEL, 0x0018)
# this will write b"FESTO" to the isdu, can be read back with:
ret = io_link_master.read_isdu(SDAS_CHANNEL, 0x0018)
print(ret.decode("ascii"))
# Write isdu with datatype defined with channel in a list
io_link_master.write_isdu("FESTO", [SDAS_CHANNEL], 0x0018)
# this will write a string "FESTO" to the isdu with correct interpretation of the
# datatype. This can be read back with:
ret = io_link_master.read_isdu([SDAS_CHANNEL], 0x0018, data_type="str")
print(ret)
io link with ehps
"""Example code for CPX-AP IO-Link master with gripper EHPS"""
import struct
import time
# import the libraries
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp
# process data as dict
def read_process_data_in(data):
"""Read the process data and return as dict"""
# ehps provides 3 x UIntegerT16 "process data in" according to datasheet.
# you can unpack the data easily with stuct
ehps_data = struct.unpack(">HHH", data)
process_data_dict = {}
process_data_dict["Error"] = bool((ehps_data[0] >> 15) & 1)
process_data_dict["DirectionCloseFlag"] = bool((ehps_data[0] >> 14) & 1)
process_data_dict["DirectionOpenFlag"] = bool((ehps_data[0] >> 13) & 1)
process_data_dict["LatchDataOk"] = bool((ehps_data[0] >> 12) & 1)
process_data_dict["UndefinedPositionFlag"] = bool((ehps_data[0] >> 11) & 1)
process_data_dict["ClosedPositionFlag"] = bool((ehps_data[0] >> 10) & 1)
process_data_dict["GrippedPositionFlag"] = bool((ehps_data[0] >> 9) & 1)
process_data_dict["OpenedPositionFlag"] = bool((ehps_data[0] >> 8) & 1)
process_data_dict["Ready"] = bool((ehps_data[0] >> 6) & 1)
process_data_dict["ErrorNumber"] = ehps_data[1]
process_data_dict["ActualPosition"] = ehps_data[2]
return process_data_dict
# list of some connected modules. IO-Link module is specified with 8 bytes per port:
# Datasheet: Per port: 8 E / 8 A Module: 32 E / 32 A. This has to be set up with the
# switch on the module.
with CpxAp(ip_address="192.168.1.1") as myCPX:
# example EHPS-20-A-LK on port 1
EHPS_CHANNEL = 1
# (optional) get the module and assign it to a new variable for easy access
io_link_master = myCPX.modules[5]
# set operating mode of channel 0 to "IO-Link communication"
io_link_master.write_module_parameter("Port Mode", "IOL_AUTOSTART", EHPS_CHANNEL)
time.sleep(0.05)
# (optional) read line-state, should now be "OPERATE" for the channel
param = io_link_master.read_fieldbus_parameters()
while param[EHPS_CHANNEL]["Port status information"] != "OPERATE":
param = io_link_master.read_fieldbus_parameters()
# read process data
raw_data = io_link_master.read_channel(EHPS_CHANNEL)
# interpret it according to datasheet
process_data_in = read_process_data_in(raw_data)
# demo of process data out needed to initialize EHPS
control_word = 0x0001 # latch
gripping_mode = 0x46 # universal
workpiece_no = 0x00
gripping_position = 0x03E8
gripping_force = 0x03 # ca. 85%
gripping_tolerance = 0x0A
pd_list = [
control_word,
gripping_mode,
workpiece_no,
gripping_position,
gripping_force,
gripping_tolerance,
]
# init
process_data_out = struct.pack(">HBBHBB", *pd_list)
io_link_master.write_channel(EHPS_CHANNEL, process_data_out)
time.sleep(0.05)
# Open command: 0x0100
pd_list[0] = 0x0100
process_data_out = struct.pack(">HBBHBB", *pd_list)
io_link_master.write_channel(EHPS_CHANNEL, process_data_out)
# wait for the process data in to change to "opened"
while not process_data_in["OpenedPositionFlag"]:
process_data_in = read_process_data_in(io_link_master.read_channel(EHPS_CHANNEL))
time.sleep(0.05)
# Close command 0x 0200
pd_list[0] = 0x0200
process_data_out = struct.pack(">HBBHBB", *pd_list)
io_link_master.write_channel(EHPS_CHANNEL, process_data_out)
# wait for the process data in to change to "closed"
while not process_data_in["ClosedPositionFlag"]:
process_data_in = read_process_data_in(io_link_master.read_channel(EHPS_CHANNEL))
time.sleep(0.05)
io_link_master.write_module_parameter("Port Mode", "DEACTIVATED", EHPS_CHANNEL)
# wait for inactive
param = io_link_master.read_fieldbus_parameters()
while param[EHPS_CHANNEL]["Port status information"] != "DEACTIVATED":
param = io_link_master.read_fieldbus_parameters()
io link switch variant
"""Example code for CPX-AP IO-Link master variant switch"""
import struct
import time
# import the libraries
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp
with CpxAp(ip_address="192.168.1.1") as myCPX:
print(myCPX.modules[5])
# (optional) get the module and assign it to a new variable for easy access
io_link_master = myCPX.modules[5]
io_link_master.change_variant("CPX-AP-I-4IOL-M12 Variant 16")
# alternatively, change by module code
# io_link_master.change_variant(8210)
# We have to reinitialize the myCPX object to update the system information and new
# process data length
with CpxAp(ip_address="192.168.1.1") as myCPX:
print(myCPX.modules[5])
io_link_master = myCPX.modules[5]
# now we can use the new variant of the IO-Link master
io_link_master.write_module_parameter("Port Mode", "IOL_AUTOSTART", 1)
fault detection and threading
"""Example code for cyclic access with python threading"""
import time
import threading
# import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp
class ThreadedCpx(CpxAp):
"""Class for threaded CpxAp"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# lock for modbus transmissions
self.lock = threading.Lock()
# error flag for error handling outside this thread
self.status_error = threading.Event()
self.exit_event = threading.Event()
# start the thread
self.continous_thread = threading.Thread(target=self.status_update)
self.continous_thread.start()
def __exit__(self, exc_type, exc_value, traceback):
# exit the thread and then close the connection
self.exit_event.set()
return super().__exit__(exc_type, exc_value, traceback)
def status_update(self):
"""continous thread to read faults in a cyclic access"""
counter = 0
while not self.exit_event.is_set():
# lock the thread when accessing the connection
with self.lock:
module_status = self.read_diagnostic_status()
if any(status.degree_of_severity_error for status in module_status):
print(
f"Status Error in module(s): "
f"{[i for i, s in enumerate(module_status) if s.degree_of_severity_error]}"
)
self.status_error.set()
time.sleep(0.05)
# a counter to show that the thread is running
print(counter)
counter += 1
class ErrorEventCpx(CpxAp):
"""Class for error event using the internal cycle_time feature"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# no lock for modbus transmissions required
self.status_error = threading.Event()
# thread is automatically started
# simulate counter from above
self.counter = 0
def perform_io(self):
"""Overwrite perform_io function, which is called periodically in a thread.
It has to perform some input/output with the device in order to reset
the modbus timeout, e.g. read_diagnostic_status()"""
module_status = self.read_diagnostic_status()
if any(status.degree_of_severity_error for status in module_status):
print(
f"Status Error in module(s): "
f"{[i for i, s in enumerate(module_status) if s.degree_of_severity_error]}"
)
self.status_error.set()
print(self.counter)
self.counter += 1
# main task with acyclic access
def main():
"""main thread with acyclic access"""
with ThreadedCpx(ip_address="192.168.1.1") as myCPX:
# your main application here...
for _ in range(3):
# acyclic communication with locking
with myCPX.lock:
information = myCPX.read_apdd_information(0)
channels = myCPX.modules[1].read_channels()
# error handling
if myCPX.status_error.is_set():
print("ERROR handling ...")
# jobs outside the cpx access can go here so they don't block the cyclic thread
print(information)
print(channels)
time.sleep(10)
# the cycle_time allows this with a simpler construct:
with ErrorEventCpx(ip_address="192.168.1.1", timeout=1, cycle_time=0.05) as myCPX:
for _ in range(3):
print(myCPX.read_apdd_information(0))
print(myCPX.modules[1].read_channels())
# error handling
if myCPX.status_error.is_set():
print("ERROR handling ...")
time.sleep(10)
print("End of main")
if __name__ == "__main__":
main()
timeout and cycle_time
"""Example code for cyclic access with python threading"""
import time
# import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp
class CustomPerfomIo(CpxAp):
"""Class for error event using the internal cycle_time feature"""
def perform_io(self):
"""Overwrite perform_io function, which is called periodically in a thread.
It has to perform some input/output with the device in order to reset
the modbus timeout, e.g. read_diagnostic_status()"""
print(self.modules[1].read_channels())
# main task with acyclic access
def main():
"""main thread with acyclic access"""
with CpxAp(ip_address="192.168.1.1", timeout=0.5, cycle_time=0.05) as myCPX:
print(myCPX.read_diagnostic_status())
# the modbus connection does not timeout although we wait 10 seconds
time.sleep(2)
print(myCPX.read_diagnostic_status())
# It is possible to overwrite the function is called in the thread cyclicly
with CustomPerfomIo(
ip_address="192.168.1.1", timeout=0.5, cycle_time=0.05
) as myCPX:
time.sleep(2)
if __name__ == "__main__":
main()
io-link manual connection and diagnosis
"""Example code for diagnosis with CPX-AP"""
import time
# Import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp
with CpxAp(ip_address="192.168.1.1") as myCPX:
# optional: for easy access, assign the IO-Link module to an object
myIOL = myCPX.cpx_ap_a_4iol_m12_variant_16
SDAS_CHANNEL = 2 # this assumes you have a SDAS sensor on channel 2
# Write the validation setting
myIOL.write_module_parameter(
"Validation & Backup", "Type compatible Device V1.1", SDAS_CHANNEL
)
# Write the IO-Link device ID and Vendor ID to the IO-Link module
# You can read these with read_fieldbus_parameters()[SDAS_CHANNEL]
# In this example, we write a wrong ID on purpose (it should be 12)
# So that an error occures and we can read it out.
myIOL.write_module_parameter("DeviceID", 13, SDAS_CHANNEL)
myIOL.write_module_parameter("Nominal Vendor ID", 333, SDAS_CHANNEL)
# Set the Port Mode to IOL_MANUAL. This will compare the written DeviceID
# with the actual device ID and the Nominal Vendor ID with the actual vendor
# ID. If they do not match, the "Port status information" of the fieldbus
# parameters will change to "PORT_DIAG"
myIOL.write_module_parameter("Port Mode", "IOL_MANUAL", SDAS_CHANNEL)
time.sleep(0.05) # give it some time to set everything
param = myIOL.read_fieldbus_parameters()
print(param[SDAS_CHANNEL])
# If everything works, this loop will run until the port goes into OPERATE
# mode. But in our example it will go to PORT_DIAG. We catch this with the
# following if statement and print the diagnosis information from the module
while param[SDAS_CHANNEL]["Port status information"] != "OPERATE":
param = myIOL.read_fieldbus_parameters()
if param[SDAS_CHANNEL]["Port status information"] == "PORT_DIAG":
print(myIOL.read_diagnosis_information())
break
Simple composition example to illustrate a possible abstraction of a more complex CPX system
# Import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp
# This is your personal class. It doesn't inherit from anything!
class PersonalClass:
# Initialize your CpxAp System and configure it with your (default) parameters
def __init__(
self, ip_address="192.168.1.1"
): # you can pass down the arguments that you want.
print("initialize the personal class")
self.cpx = CpxAp(ip_address=ip_address)
# to be used in a context manager, add the enter and exit function (leave the enter as it is)
def __enter__(self):
print("enter the personal class")
return self
# be sure to shutdown the cpx object in the exit function.
def __exit__(self, *args):
print("clean up CpxAp")
# clean up everything
self.cpx.shutdown()
# implement your personal functions. For example a pressurize function with VTUX
def pressurize(self, valve_number: int):
# do some complex computation and evaluation of different
# ap modules
self.cpx.modules[1].set_channel(valve_number)
# Demo to show that everything works correctly
# if you run this script it will create a personal class but then raise a ZeroDivisionError.
# You can see from the prints in the console how it behaves and that the use of the context
# manager of PersonalClass is working and shutting down the cpx correctly
if __name__ == "__main__":
# simply use the pressurize command
with PersonalClass() as personal:
personal.pressurize(1)
with PersonalClass() as personal:
i = 1 / 0
print("This is never reached")
CPX-E
object with typecode
"""Example code for CPX-E with typecode"""
# import the library
from cpx_io.cpx_system.cpx_e.cpx_e import CpxE
# use the typecode to setup all attached modules
with CpxE("60E-EP-MLNINO", ip_address="192.168.1.1") as myCPX:
# read system information
module_list = myCPX.modules
# the modules are all named automatically and one can access them by their name or index
module_0 = myCPX.modules[0] # access by index
module_1 = myCPX.cpxe8do # access by name (automatically generated)
# rename modules (also see example_cpxe_add_module.py)
myCPX.modules[0].name = "ep_module" # access by index
myCPX.cpxe8do.name = "digital_output_module" # access by name
object with module list
"""Example code for CPX-E with list of modules"""
# import the librarys
from cpx_io.cpx_system.cpx_e.cpx_e import CpxE
from cpx_io.cpx_system.cpx_e.eep import CpxEEp
from cpx_io.cpx_system.cpx_e.e16di import CpxE16Di
# use list of modules to call CpxE
modules = [CpxEEp(), CpxE16Di()]
with CpxE(ip_address="192.168.1.1", modules=modules) as myCPX:
# read system information
module_list = myCPX.modules
# the modules are all named automatically and one can access them by their name or index
module_0 = myCPX.modules[0] # access by index
module_1 = myCPX.cpxe16di # access by name (automatically generated)
# rename modules (also see example_cpxe_add_module.py)
myCPX.modules[0].name = "ep_module" # access by index
myCPX.cpxe16di.name = "digital_input_module" # access by name
empty object and add modules later
"""Example code for CPX-E add module"""
# import the librarys
from cpx_io.cpx_system.cpx_e.cpx_e import CpxE
from cpx_io.cpx_system.cpx_e.e16di import CpxE16Di
from cpx_io.cpx_system.cpx_e.e8do import CpxE8Do
with CpxE(ip_address="192.168.1.1") as myCPX:
# add modules (the order must be from left to right in the cpx-e system,
# the first module -EP is already added at position 0)
myCPX.add_module(CpxE16Di())
myCPX.add_module(CpxE8Do())
# read system information
module_list = myCPX.modules
# the modules are all named automatically and one can access them by their name or index
module_0 = myCPX.modules[0] # access by index
module_1 = myCPX.cpxe16di # access by name (automatically generated)
module_2 = myCPX.cpxe8do # access by name (automatically generated)
# rename modules
myCPX.modules[0].name = "ep_module" # access by index
myCPX.cpxe16di.name = "digital_input_module" # access by name
myCPX.cpxe8do.name = "digital_output_module" # access by name
# rename them again
myCPX.modules[0].name = "my_ep_module" # access by index
myCPX.digital_input_module.name = "my_digital_input_module" # access by name
myCPX.digital_output_module.name = "my_digital_output_module" # access by name
digital input
"""Example code for CPX-E digital input"""
# import the librarys
from cpx_io.cpx_system.cpx_e.cpx_e import CpxE
from cpx_io.cpx_system.cpx_e.eep import CpxEEp
from cpx_io.cpx_system.cpx_e.e16di import CpxE16Di
# use the typecode to setup all attached modules
with CpxE(ip_address="192.168.1.1", modules=[CpxEEp(), CpxE16Di()]) as myCPX:
# the modules are all named automatically and one can access them by their name or index
# read all channels
myCPX.cpxe16di.read_channels()
# read one channel
myCPX.cpxe16di.read_channel(0)
# configure the diagnostics of the module
myCPX.cpxe16di.configure_diagnostics(True)
# configure the power reset
myCPX.cpxe16di.configure_power_reset(True)
# configure input debounce time
myCPX.cpxe16di.configure_debounce_time(2)
digital output
"""Example code for CPX-E digital output"""
# import the librarys
from cpx_io.cpx_system.cpx_e.cpx_e import CpxE
from cpx_io.cpx_system.cpx_e.eep import CpxEEp
from cpx_io.cpx_system.cpx_e.e8do import CpxE8Do
# use the typecode to setup all attached modules
with CpxE(ip_address="192.168.1.1", modules=[CpxEEp(), CpxE8Do()]) as myCPX:
# the modules are all named automatically and one can access them by their name or index
# read all channels
myCPX.cpxe8do.read_channels()
# read one channel
myCPX.cpxe8do.read_channel(0)
# write all channels at once
data = [False] * 8
myCPX.cpxe8do.write_channels(data)
# set and reset or toggle the state of one channel
myCPX.cpxe8do.set_channel(0)
myCPX.cpxe8do.reset_channel(0)
myCPX.cpxe8do.toggle_channel(0)
# configure the diagnostics of the module
myCPX.cpxe8do.configure_diagnostics(short_circuit=False, undervoltage=False)
# configure the power reset
myCPX.cpxe8do.configure_power_reset(True)
io-link sensor (sdas)
"""Example code for CPX-E IO-Link master with sensor SDAS"""
# import the librarys
from cpx_io.cpx_system.cpx_e.cpx_e import CpxE
from cpx_io.cpx_system.cpx_e.eep import CpxEEp
from cpx_io.cpx_system.cpx_e.e4iol import CpxE4Iol
# use the typecode to setup all attached modules
# adapt the address_space according to your setup (dip switches)
# default is 2 bytes per port
with CpxE(
ip_address="192.168.1.1",
modules=[CpxEEp(), CpxE4Iol()],
) as myCPX:
# read system information
module_list = myCPX.modules
SDAS_CHANNEL = 0
# set operating mode of channel 0 to "IO-Link communication"
myCPX.cpxe4iol.configure_operating_mode(3, SDAS_CHANNEL)
# read line-state, should now be "OPERATE"
param = myCPX.cpxe4iol.read_line_state()
# read the channel 0 process data
sdas_data = myCPX.cpxe4iol.read_channel(SDAS_CHANNEL)
# the process data consists of 4 ssc bits and 12 bit pdv (see datasheet sdas)
# you can convert the bytes data to integer first and then do bitwise operations
process_data_int = int.from_bytes(sdas_data, byteorder="big")
ssc1 = bool(process_data_int & 0x1)
ssc2 = bool(process_data_int & 0x2)
ssc3 = bool(process_data_int & 0x4)
ssc4 = bool(process_data_int & 0x8)
pdv = (process_data_int & 0xFFF0) >> 4
# you can also read the device error or other parameters
myCPX.cpxe4iol.read_device_error(0)
io-link actuator (ehps)
"""Example code for CPX-E IO-Link master with gripper EHPS"""
import struct
import time
# import the librarys
from cpx_io.cpx_system.cpx_e.cpx_e import CpxE
from cpx_io.cpx_system.cpx_e.eep import CpxEEp
from cpx_io.cpx_system.cpx_e.e4iol import CpxE4Iol
# process data as dict
def read_process_data_in(data):
"""Read the process data and return as dict"""
# ehps provides 3 x UIntegerT16 "process data in" according to datasheet.
# you can unpack the data easily with stuct
ehps_data = struct.unpack(">HHHH", data)
# the last 16 bit value of this list is always 0 since ehps only uses 3*16 bit
assert ehps_data[3] == 0
process_data_dict = {}
process_data_dict["Error"] = bool((ehps_data[0] >> 15) & 1)
process_data_dict["DirectionCloseFlag"] = bool((ehps_data[0] >> 14) & 1)
process_data_dict["DirectionOpenFlag"] = bool((ehps_data[0] >> 13) & 1)
process_data_dict["LatchDataOk"] = bool((ehps_data[0] >> 12) & 1)
process_data_dict["UndefinedPositionFlag"] = bool((ehps_data[0] >> 11) & 1)
process_data_dict["ClosedPositionFlag"] = bool((ehps_data[0] >> 10) & 1)
process_data_dict["GrippedPositionFlag"] = bool((ehps_data[0] >> 9) & 1)
process_data_dict["OpenedPositionFlag"] = bool((ehps_data[0] >> 8) & 1)
process_data_dict["Ready"] = bool((ehps_data[0] >> 6) & 1)
process_data_dict["ErrorNumber"] = ehps_data[1]
process_data_dict["ActualPosition"] = ehps_data[2]
return process_data_dict
# list of some connected modules. IO-Link module is specified with 8 bytes per port:
# Datasheet: Per port: 8 E / 8 A Module: 32 E / 32 A. This has to be set up with the
# switch on the module.
with CpxE(
ip_address="192.168.1.1",
modules=[CpxEEp(), CpxE4Iol(8)],
) as myCPX:
# read system information
module_list = myCPX.modules
# example EHPS-20-A-LK on port 1
EHPS_CHANNEL = 1
# (optional) get the module and assign it to a new variable for easy access
e4iol = myCPX.cpxe4iol
# (optional) you can reset the power for each channel
e4iol.configure_pl_supply(False, EHPS_CHANNEL)
e4iol.configure_ps_supply(False)
time.sleep(0.05)
e4iol.configure_pl_supply(True, EHPS_CHANNEL)
e4iol.configure_ps_supply(True)
time.sleep(0.05)
# set operating mode of channel 0 to "IO-Link communication"
e4iol.configure_operating_mode(3, channel=EHPS_CHANNEL)
time.sleep(0.05)
# (optional) read line-state, should now be "OPERATE" for the channel
param = e4iol.read_line_state()
# read process data
raw_data = e4iol.read_channel(EHPS_CHANNEL)
# interpret it according to datasheet
process_data_in = read_process_data_in(raw_data)
# demo of process data out needed to initialize EHPS
control_word_msb = 0x00
control_word_lsb = 0x01 # latch
gripping_mode = 0x46 # universal
workpiece_no = 0x00
gripping_position = 0x03E8
gripping_force = 0x03 # ca. 85%
gripping_tolerance = 0x0A
data = [
control_word_lsb + (control_word_msb << 8),
workpiece_no + (gripping_mode << 8),
gripping_position,
gripping_tolerance + (gripping_force << 8),
]
# init
# pack the 4*16bit values to bytes
process_data_out = struct.pack(">HHHH", *data)
e4iol.write_channel(EHPS_CHANNEL, process_data_out)
time.sleep(0.05)
# Open command: 0x0100
process_data_out[0] = 0x0100
process_data_out = struct.pack(">HHHH", *data)
e4iol.write_channel(EHPS_CHANNEL, process_data_out)
# wait for the process data in to change to "opened"
while not process_data_in["OpenedPositionFlag"]:
process_data_in = read_process_data_in(e4iol.read_channel(EHPS_CHANNEL))
time.sleep(0.05)
# Close command 0x 0200
process_data_out[0] = 0x0200
process_data_out = struct.pack(">HHHH", *data)
e4iol.write_channel(EHPS_CHANNEL, process_data_out)
# wait for the process data in to change to "closed"
while not process_data_in["ClosedPositionFlag"]:
process_data_in = read_process_data_in(e4iol.read_channel(EHPS_CHANNEL))
time.sleep(0.05)
fault detection and threading
"""Example code for cyclic access with python threading"""
import time
import threading
# import the library
from cpx_io.cpx_system.cpx_e.cpx_e import CpxE
from cpx_io.cpx_system.cpx_e.e16di import CpxE16Di
class ThreadedCpx(CpxE):
"""Class for threaded CpxE"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# lock for modbus transmissions
self.lock = threading.Lock()
# error flag for error handling outside this thread
self.status_error = threading.Event()
self.exit_event = threading.Event()
# start the thread
self.continous_thread = threading.Thread(target=self.status_update)
self.continous_thread.start()
def __exit__(self, exc_type, exc_value, traceback):
# exit the thread and then close the connection
self.exit_event.set()
return super().__exit__(exc_type, exc_value, traceback)
def status_update(self):
"""continous thread to read faults in a cyclic access"""
counter = 0
while not self.exit_event.is_set():
# lock the thread when accessing the connection
with self.lock:
module_status = self.read_fault_detection()
if any(module_status):
print(
f"Status Error in module(s): "
f"{[i for i, s in enumerate(module_status) if s]}"
)
self.status_error.set()
time.sleep(0.05)
# a counter to show that the thread is running
print(counter)
counter += 1
# main task with acyclic access
def main():
"""main thread with acyclic access"""
with ThreadedCpx(ip_address="192.168.1.1") as myCPX:
myCPX.add_module(CpxE16Di())
# your main application here...
for _ in range(3):
# acyclic communication with locking
with myCPX.lock:
information = myCPX.read_device_identification()
channels = myCPX.modules[1].read_channels()
# error handling
if myCPX.status_error.is_set():
print("ERROR handling ...")
# jobs outside the cpx access can go here so they don't block the cyclic thread
print(information)
print(channels)
time.sleep(10)
print("End of main")
if __name__ == "__main__":
main()