Examples

CPX-AP

startup

"""Example code for CPX-AP startup and system information"""

# import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp

# for cpx_ap, the attached modules are found automatically
with CpxAp(ip_address="192.168.1.1") as myCPX:
    # view the system documentation by going to the apdd path on your system directory
    # this documentation is updated everytime the CpxAp Object with this ip address is
    # instanciated. It gives an overview of all parameters and functions of each module
    print(myCPX.docu_path)

    # alternatively you can print out the system information in the console
    myCPX.print_system_information()

    # to get an overview of all available parameters, there is a function that iterates
    # over every module and reads out the parameters and channels if available
    myCPX.print_system_state()

Download Example

parameter read/write

"""Example code for CPX-AP parameter read and write"""

# import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp

# for cpx_ap, the attached modules are found automatically
with CpxAp(ip_address="192.168.1.1") as myCPX:
    # Read the automatically generated documentation on your system folder
    # It gives an overview of all parameters and functions of each module
    print(myCPX.docu_path)
    print("--------------------\n")

    # to get an overview of all available parameters, there is a function that iterates
    # over every module and reads out the parameters and channels if available
    myCPX.print_system_state()
    print("--------------------\n")

    # to read or write an individual parameter, you can either call it by ID or name,
    # ID is a bit faster. You can get both ID and name from the print_system_state function
    # or from the documentation or print the available parameters from each module. If the
    # value has enums as value, it will return the enum strings instead of the integer.
    module = myCPX.modules[1]
    for parameter in module.module_dicts.parameters.values():
        # if you want to read it and print the correct unit of the value, you can get the unit
        # from the parameter. If you want to get the (if available) enum string, use
        # read_module_parameter_enum_str() instead.
        print(
            parameter.name,
            module.read_module_parameter(parameter.parameter_id),
            parameter.unit,
        )
    print("--------------------\n")

    # writing a parameter is almost the same, you can check if the parameter is writable by
    # the R/W tag or get it from the parameter. You should also check the datatype of the
    # parameter to reduce unwanted behaviours. If the parameter allows enums as value you can
    # either use the enum strings (you can get them by printing them from the parameter)

    # try this example with a cpx-ap digital input module on index 1 or adapt VALUE to the
    # correct value for your module parameter
    module = myCPX.modules[1]
    PARAM_VALUE = "3ms"
    PARAM_ID = 20014

    for parameter in module.module_dicts.parameters.values():
        print(
            parameter.parameter_id,
            parameter.name,
            "\tIs writeable: ",
            parameter.is_writable,
            "\tData type: ",
            parameter.data_type,
            "\tEnums: ",
            parameter.enums,
        )

    # adapt this to your found parameter
    module.write_module_parameter(PARAM_ID, PARAM_VALUE)

Download Example

digital input

"""Example code for CPX-AP digital input"""

# import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp

# for cpx_ap, the attached modules are found automatically
with CpxAp(ip_address="192.168.1.1") as myCPX:
    # Read the automatically generated documentation on your system folder
    # It gives an overview of all parameters and functions of each module
    print(myCPX.docu_path)

    # module index 0 is 'CPX-AP-*-EP' (* can be I or A)
    # The first IO module (index 1) is 'CPX-AP-I-8DI-M8-3P'

    # read digital input on channel 0
    # access by index
    myCPX.modules[1].read_channel(0)

    # or access by automatically generated name, you can see the default names
    # in the documentation but you can rename the modules anytime you like
    myCPX.modules[1].name = "cpxap8di"
    myCPX.cpxap8di.read_channel(0)

    # you can also assign it to a custom object
    myIO = myCPX.modules[1]
    myIO.read_channel(0)

    # you can read all channels at once, this returns a list of bool
    myIO.read_channels()

    # configure the module
    # you can read/write the parameters from the module. Check the parameter_read_write example
    # for detailed information. The Input Debounce Time parameter will return an enum string.
    debounce_time = myIO.read_module_parameter_enum_str("Input Debounce Time")

Download Example

digital output

"""Example code for CPX-AP digital output"""

# import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp

# for cpx_ap, the attached modules are found automatically
with CpxAp(ip_address="192.168.1.1") as myCPX:
    # Read the automatically generated documentation on your system folder
    # It gives an overview of all parameters and functions of each module
    print(myCPX.docu_path)

    # module index 0 is 'CPX-AP-*-EP' (* can be I or A)
    # in this example the first IO module (index 1) is 'CPX-AP-*-4DI4DO-M12-5P'
    # same as with every module, you can access it by the module index or the automatically
    # generated name (get it from documentation) as well as rename it (see digital_input example)

    dido = myCPX.modules[1]

    # keep in mind that the outputs will switch on and off as fast as your pc allows. It is very
    # likely, that you will not see any change on the outputs before the code runs through and
    # the connection is closed. If you actually want to see something in this example, I suggest
    # using the sleep function from the time module to wait. Keep in mind that this will only work
    # if you disable the modbus timeout by passing timeout=0 in the CpxAp constructor.

    # set, reset, toggle a digital output
    dido.set_channel(0)  # sets one channel, returns none
    dido.reset_channel(0)  # reset one channel, returns none
    dido.toggle_channel(0)  # toggle the state of one channel, returns none

    # sets all channel to list values [0,1,2,3] and returns none
    dido.write_channels([True, True, False, False])

    # read back the values of all channels. Consists of 4 input channels and 4 output channels
    dido.read_channels()

    # reads back the first input channel
    dido.read_channel(0)
    # reads back the first output channel, same as "read_channel(4)"
    dido.read_output_channel(0)

    # configure the module. Check what parameters are available in the documentation or read
    # them from the module (see parameter_read_write example)
    for parameter in dido.module_dicts.parameters.values():
        print(parameter)

    # sets debounce time to 10ms
    dido.write_module_parameter("Input Debounce Time", "10ms")
    # sets Load supply monitoring inactive, you can access also by ID (is a bit faster than name)
    dido.write_module_parameter(20022, "Load supply monitoring inactive")
    # hold last state on the outputs, you can also set it by the integer value instead of the enum
    dido.write_module_parameter(20052, 1)

Download Example

simple diagnosis

"""Example code for CPX-AP startup and system information"""

# import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp

# for cpx_ap, the attached modules are found automatically
with CpxAp(ip_address="192.168.1.1") as myCPX:

    # iterate over each module and print out the available diagnosis object
    for index, module in enumerate(myCPX.modules):
        diagnosis = module.read_diagnosis_information()

        # don't print if diagnosis == None
        if diagnosis:
            print(index, diagnosis)

Download Example

fault detection and threading

"""Example code for cyclic access with python threading"""

import time
import threading

# import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp


class ThreadedCpx(CpxAp):
    """Class for threaded CpxAp"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # lock for modbus transmissions
        self.lock = threading.Lock()
        # error flag for error handling outside this thread
        self.status_error = threading.Event()
        self.exit_event = threading.Event()
        # start the thread
        self.continous_thread = threading.Thread(target=self.status_update)
        self.continous_thread.start()

    def __exit__(self, exc_type, exc_value, traceback):
        # exit the thread and then close the connection
        self.exit_event.set()
        return super().__exit__(exc_type, exc_value, traceback)

    def status_update(self):
        """continous thread to read faults in a cyclic access"""
        counter = 0
        while not self.exit_event.is_set():
            # lock the thread when accessing the connection
            with self.lock:
                module_status = self.read_diagnostic_status()

            if any(status.degree_of_severity_error for status in module_status):
                print(
                    f"Status Error in module(s): "
                    f"{[i for i, s in enumerate(module_status) if s.degree_of_severity_error]}"
                )
                self.status_error.set()

            time.sleep(0.05)
            # a counter to show that the thread is running
            print(counter)
            counter += 1


class ErrorEventCpx(CpxAp):
    """Class for error event using the internal cycle_time feature"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # no lock for modbus transmissions required
        self.status_error = threading.Event()
        # thread is automatically started
        # simulate counter from above
        self.counter = 0

    def perform_io(self):
        """Overwrite perform_io function, which is called periodically in a thread.
        It has to perform some input/output with the device in order to reset
        the modbus timeout, e.g. read_diagnostic_status()"""
        module_status = self.read_diagnostic_status()
        if any(status.degree_of_severity_error for status in module_status):
            print(
                f"Status Error in module(s): "
                f"{[i for i, s in enumerate(module_status) if s.degree_of_severity_error]}"
            )
            self.status_error.set()
        print(self.counter)
        self.counter += 1


# main task with acyclic access
def main():
    """main thread with acyclic access"""
    with ThreadedCpx(ip_address="192.168.1.1") as myCPX:

        # your main application here...
        for _ in range(3):
            # acyclic communication with locking
            with myCPX.lock:
                information = myCPX.read_apdd_information(0)
                channels = myCPX.modules[1].read_channels()

            # error handling
            if myCPX.status_error.is_set():
                print("ERROR handling ...")

            # jobs outside the cpx access can go here so they don't block the cyclic thread
            print(information)
            print(channels)

            time.sleep(10)

    # the cycle_time allows this with a simpler construct:
    with ErrorEventCpx(ip_address="192.168.1.1", timeout=1, cycle_time=0.05) as myCPX:
        for _ in range(3):
            print(myCPX.read_apdd_information(0))
            print(myCPX.modules[1].read_channels())

            # error handling
            if myCPX.status_error.is_set():
                print("ERROR handling ...")

            time.sleep(10)

    print("End of main")


if __name__ == "__main__":
    main()

Download Example

timeout and cycle_time

"""Example code for cyclic access with python threading"""

import time

# import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp


class CustomPerfomIo(CpxAp):
    """Class for error event using the internal cycle_time feature"""

    def perform_io(self):
        """Overwrite perform_io function, which is called periodically in a thread.
        It has to perform some input/output with the device in order to reset
        the modbus timeout, e.g. read_diagnostic_status()"""
        print(self.modules[1].read_channels())


# main task with acyclic access
def main():
    """main thread with acyclic access"""
    with CpxAp(ip_address="192.168.1.1", timeout=0.5, cycle_time=0.05) as myCPX:
        print(myCPX.read_diagnostic_status())
        # the modbus connection does not timeout although we wait 10 seconds
        time.sleep(2)
        print(myCPX.read_diagnostic_status())

    # It is possible to overwrite the function is called in the thread cyclicly
    with CustomPerfomIo(
        ip_address="192.168.1.1", timeout=0.5, cycle_time=0.05
    ) as myCPX:
        time.sleep(2)


if __name__ == "__main__":
    main()

Download Example

Simple composition example to illustrate a possible abstraction of a more complex CPX system

# Import the library
from cpx_io.cpx_system.cpx_ap.cpx_ap import CpxAp


# This is your personal class. It doesn't inherit from anything!
class PersonalClass:
    # Initialize your CpxAp System and configure it with your (default) parameters
    def __init__(
        self, ip_address="192.168.1.1"
    ):  # you can pass down the arguments that you want.
        print("initialize the personal class")
        self.cpx = CpxAp(ip_address=ip_address)

    # to be used in a context manager, add the enter and exit function (leave the enter as it is)
    def __enter__(self):
        print("enter the personal class")
        return self

    # be sure to shutdown the cpx object in the exit function.
    def __exit__(self, *args):
        print("clean up CpxAp")
        # clean up everything
        self.cpx.shutdown()

    # implement your personal functions. For example a pressurize function with VTUX
    def pressurize(self, valve_number: int):
        # do some complex computation and evaluation of different
        # ap modules
        self.cpx.modules[1].set_channel(valve_number)



# Demo to show that everything works correctly
# if you run this script it will create a personal class but then raise a ZeroDivisionError.
# You can see from the prints in the console how it behaves and that the use of the context
# manager of PersonalClass is working and shutting down the cpx correctly
if __name__ == "__main__":
    # simply use the pressurize command
    with PersonalClass() as personal:
        personal.pressurize(1)

    with PersonalClass() as personal:
        i = 1 / 0
        print("This is never reached")

Download Example

CPX-E

object with typecode

"""Example code for CPX-E with typecode"""

# import the library
from cpx_io.cpx_system.cpx_e.cpx_e import CpxE

# use the typecode to setup all attached modules
with CpxE("60E-EP-MLNINO", ip_address="192.168.1.1") as myCPX:
    # read system information
    module_list = myCPX.modules

    # the modules are all named automatically and one can access them by their name or index
    module_0 = myCPX.modules[0]  # access by index
    module_1 = myCPX.cpxe8do  # access by name (automatically generated)

    # rename modules (also see example_cpxe_add_module.py)
    myCPX.modules[0].name = "ep_module"  # access by index
    myCPX.cpxe8do.name = "digital_output_module"  # access by name

Download Example

object with module list

"""Example code for CPX-E with list of modules"""

# import the librarys
from cpx_io.cpx_system.cpx_e.cpx_e import CpxE
from cpx_io.cpx_system.cpx_e.eep import CpxEEp
from cpx_io.cpx_system.cpx_e.e16di import CpxE16Di

# use list of modules to call CpxE
modules = [CpxEEp(), CpxE16Di()]

with CpxE(ip_address="192.168.1.1", modules=modules) as myCPX:
    # read system information
    module_list = myCPX.modules

    # the modules are all named automatically and one can access them by their name or index
    module_0 = myCPX.modules[0]  # access by index
    module_1 = myCPX.cpxe16di  # access by name (automatically generated)

    # rename modules (also see example_cpxe_add_module.py)
    myCPX.modules[0].name = "ep_module"  # access by index
    myCPX.cpxe16di.name = "digital_input_module"  # access by name

Download Example

empty object and add modules later

"""Example code for CPX-E add module"""

# import the librarys
from cpx_io.cpx_system.cpx_e.cpx_e import CpxE
from cpx_io.cpx_system.cpx_e.e16di import CpxE16Di
from cpx_io.cpx_system.cpx_e.e8do import CpxE8Do

with CpxE(ip_address="192.168.1.1") as myCPX:
    # add modules (the order must be from left to right in the cpx-e system,
    # the first module -EP is already added at position 0)
    myCPX.add_module(CpxE16Di())
    myCPX.add_module(CpxE8Do())

    # read system information
    module_list = myCPX.modules

    # the modules are all named automatically and one can access them by their name or index
    module_0 = myCPX.modules[0]  # access by index
    module_1 = myCPX.cpxe16di  # access by name (automatically generated)
    module_2 = myCPX.cpxe8do  # access by name (automatically generated)

    # rename modules
    myCPX.modules[0].name = "ep_module"  # access by index
    myCPX.cpxe16di.name = "digital_input_module"  # access by name
    myCPX.cpxe8do.name = "digital_output_module"  # access by name

    # rename them again
    myCPX.modules[0].name = "my_ep_module"  # access by index
    myCPX.digital_input_module.name = "my_digital_input_module"  # access by name
    myCPX.digital_output_module.name = "my_digital_output_module"  # access by name

Download Example

digital input

"""Example code for CPX-E digital input"""

# import the librarys
from cpx_io.cpx_system.cpx_e.cpx_e import CpxE
from cpx_io.cpx_system.cpx_e.eep import CpxEEp
from cpx_io.cpx_system.cpx_e.e16di import CpxE16Di

# use the typecode to setup all attached modules
with CpxE(ip_address="192.168.1.1", modules=[CpxEEp(), CpxE16Di()]) as myCPX:
    # the modules are all named automatically and one can access them by their name or index

    # read all channels
    myCPX.cpxe16di.read_channels()

    # read one channel
    myCPX.cpxe16di.read_channel(0)

    # configure the diagnostics of the module
    myCPX.cpxe16di.configure_diagnostics(True)

    # configure the power reset
    myCPX.cpxe16di.configure_power_reset(True)

    # configure input debounce time
    myCPX.cpxe16di.configure_debounce_time(2)

Download Example

digital output

"""Example code for CPX-E digital output"""

# import the librarys
from cpx_io.cpx_system.cpx_e.cpx_e import CpxE
from cpx_io.cpx_system.cpx_e.eep import CpxEEp
from cpx_io.cpx_system.cpx_e.e8do import CpxE8Do

# use the typecode to setup all attached modules
with CpxE(ip_address="192.168.1.1", modules=[CpxEEp(), CpxE8Do()]) as myCPX:
    # the modules are all named automatically and one can access them by their name or index

    # read all channels
    myCPX.cpxe8do.read_channels()

    # read one channel
    myCPX.cpxe8do.read_channel(0)

    # write all channels at once
    data = [False] * 8
    myCPX.cpxe8do.write_channels(data)

    # set and reset or toggle the state of one channel
    myCPX.cpxe8do.set_channel(0)
    myCPX.cpxe8do.reset_channel(0)
    myCPX.cpxe8do.toggle_channel(0)

    # configure the diagnostics of the module
    myCPX.cpxe8do.configure_diagnostics(short_circuit=False, undervoltage=False)

    # configure the power reset
    myCPX.cpxe8do.configure_power_reset(True)

Download Example

fault detection and threading

"""Example code for cyclic access with python threading"""

import time
import threading

# import the library
from cpx_io.cpx_system.cpx_e.cpx_e import CpxE
from cpx_io.cpx_system.cpx_e.e16di import CpxE16Di


class ThreadedCpx(CpxE):
    """Class for threaded CpxE"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # lock for modbus transmissions
        self.lock = threading.Lock()
        # error flag for error handling outside this thread
        self.status_error = threading.Event()
        self.exit_event = threading.Event()
        # start the thread
        self.continous_thread = threading.Thread(target=self.status_update)
        self.continous_thread.start()

    def __exit__(self, exc_type, exc_value, traceback):
        # exit the thread and then close the connection
        self.exit_event.set()
        return super().__exit__(exc_type, exc_value, traceback)

    def status_update(self):
        """continous thread to read faults in a cyclic access"""
        counter = 0
        while not self.exit_event.is_set():
            # lock the thread when accessing the connection
            with self.lock:
                module_status = self.read_fault_detection()

            if any(module_status):
                print(
                    f"Status Error in module(s): "
                    f"{[i for i, s in enumerate(module_status) if s]}"
                )
                self.status_error.set()

            time.sleep(0.05)
            # a counter to show that the thread is running
            print(counter)
            counter += 1


# main task with acyclic access
def main():
    """main thread with acyclic access"""
    with ThreadedCpx(ip_address="192.168.1.1") as myCPX:
        myCPX.add_module(CpxE16Di())

        # your main application here...
        for _ in range(3):
            # acyclic communication with locking
            with myCPX.lock:
                information = myCPX.read_device_identification()
                channels = myCPX.modules[1].read_channels()

            # error handling
            if myCPX.status_error.is_set():
                print("ERROR handling ...")

            # jobs outside the cpx access can go here so they don't block the cyclic thread
            print(information)
            print(channels)

            time.sleep(10)

    print("End of main")


if __name__ == "__main__":
    main()

Download Example