Skip to content

vaem_communication

Festo VAEM backend communication module.

This module handles all communication underneath the hood and abstracting it all from the user.

VAEMModbusClient

Bases: ABC

Modbus Client Class.

Source code in src/vaem/vaem_communication.py
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
class VAEMModbusClient(ABC):
    """Modbus Client Class."""

    client: ModbusBaseSyncClient

    @abstractmethod
    def __init__(self, config: VAEMConfig):
        """
        VAEMModbusClient constructor.

        Abstract base class to build out VAEM clients.

        Args:
            config (VAEMConfig): A ModbusTCP or ModbusSerial
                    type to allow the driver to connect to the
                    correct communication interface.

        Returns:
            None
        """
        self._config = config
        self.version = None
        self._read_param = {
            "address": 0,
            "length": 0x07,
        }
        self._write_param = {
            "address": 0,
            "length": 0x07,
        }
        self._init_done = True
        self.error_handling_enabled = 1
        self.active_valves = [0, 0, 0, 0, 0, 0, 0, 0]

    def _get_transfer_value(self, operation, index, sub_index=0, transfer_value=None) -> dict:
        """
        Gets the transfer value for the VAEM operation.

        Args:
            operation: access
            index: Data object index for accessing VAEM register. Must be of type VaemIndex Enum class
            sub_index: Data object sub_index; often, the valve index for the VAEM
            transfer_value: The actual value to be transfered and saved to the index:sub_index pair location on the VAEM
        Returns:
            dictionary of out parameters to be passed into the VAEM
        """
        out = {}
        out["access"] = operation
        out["paramIndex"] = index.value
        out["paramSubIndex"] = sub_index
        out["errorRet"] = 0
        out["dataType"] = VaemDataType.UINT16.value
        out["transferValue"] = transfer_value

        match index.value:
            case 0x07 | 0x08 | 0x16 | 0x2E:
                # Response time
                out["dataType"] = VaemDataType.UINT32.value
            case 0x09 | 0x2D:
                # Operating mode
                out["dataType"] = VaemDataType.UINT8.value
            case 0x13:
                out["dataType"] = VaemDataType.UINT8.value
                out["paramSubIndex"] = 0
                out["transferValue"] = sub_index
            case 0x01 | 0x02 | 0x04 | 0x05 | 0x06 | 0x11:
                pass
            case _:
                logger.error("Currently unsupported input param")

        return out

    def _get_status(self, status_word) -> dict:
        """
        Gets the current status of the different parts of the VAEM.

        from the 15 bit status word returned by the VAEM.

        Args:
            status_word (int): 15 bit binary status word from VAEM

        Returns:
            Dictionary of values for each param
        """
        status = {}
        status["Status"] = status_word & 0x01
        status["Error"] = (status_word & 0x08) >> 3
        status["Readiness"] = (status_word & 0x10) >> 4
        status["OperatingMode"] = (status_word & 0xC0) >> 6
        status["Valve1"] = (status_word & 0x100) >> 8
        status["Valve2"] = (status_word & 0x200) >> 9
        status["Valve3"] = (status_word & 0x400) >> 10
        status["Valve4"] = (status_word & 0x800) >> 11
        status["Valve5"] = (status_word & 0x1000) >> 12
        status["Valve6"] = (status_word & 0x2000) >> 13
        status["Valve7"] = (status_word & 0x4000) >> 14
        status["Valve8"] = (status_word & 0x8000) >> 15
        return status

    def _construct_frame(self, data: dict) -> list:
        """
        Constructs data frame for transfer to VAEM device.

        Args:
            data (dict): Data to be sent to VAEM device
        Returns:
            list of values to be passed as the expected data type of the Modbus data frame
        """
        frame = []
        tmp = struct.pack(
            ">BBHBBQ",
            data["access"],
            data["dataType"],
            data["paramIndex"],
            data["paramSubIndex"],
            data["errorRet"],
            data["transferValue"],
        )
        try:
            for i in range(0, len(tmp) - 1, 2):
                frame.append((tmp[i] << 8) + tmp[i + 1])
        except ValueError as e:
            logger.error("Value error: %s. ", str(e))
        return frame

    def _deconstruct_frame(self, frame) -> dict:
        """
        Deconstructs incoming data frame from VAEM device.

        Args:
            frame: dict coming in from the device
        Returns:
            data: dictionary that contains the information from the dataframe.
        """
        data = {}
        if frame is not None:
            data["access"] = (frame[0] & 0xFF00) >> 8
            data["dataType"] = frame[0] & 0x00FF
            data["paramIndex"] = frame[1]
            data["paramSubIndex"] = (frame[2] & 0xFF00) >> 8
            data["errorRet"] = frame[2] & 0x00FF
            data["transferValue"] = 0
            for i in range(4):
                data["transferValue"] += frame[len(frame) - 1 - i] << (i * 16)

        return data

    def _transfer(self, write_data: list):
        """
        Method of transferring information from Python driver to device.

        Args:
            write_data: List of data that will be transferred to VAEM device
        Returns:
            Response from VAEM device.
        """
        if not self.client.connected:  # type: ignore[attr-defined, ty:unresolved-attribute]
            self.client.connect()
        try:
            data = self.client.readwrite_registers(  # type: ignore[missing-argument]
                read_address=self._read_param["address"],
                read_count=self._read_param["length"],
                write_address=self._write_param["address"],
                values=write_data,
                device_id=self._config.unit_id,
            )
            return data.registers
        except ModbusException as modbus_error:
            logger.error("Something went wrong with read opperation VAEM : %s", str(modbus_error))
        return None

    def _vaem_init(self):
        """
        Runs an additional vaem initialization process to configure.

        the correct read and write for the driver.
        """
        if self._init_done:
            try:
                # set operating mode
                data = self._get_transfer_value(
                    VaemAccess.WRITE.value,
                    VaemIndex.OPERATINGMODE,
                    0,
                    VaemOperatingMode.OPMODE1.value,
                )
                frame = self._construct_frame(data)
                self._transfer(frame)
                self.clear_error()
                self._init_done = True
                self.error_handling_enabled = self.get_error_handling_status()
            except ConnectionError as e:
                logger.error("Connection error: %s. ", str(e))
        else:
            logger.warning("No VAEM Connected!! CANNOT INITIALIZE")

    def save_settings(self) -> None:
        """
        Saves all parameters to non-volatile memory.

        Typical usage example:
            vaem.save_settings()

        Args:
            None

        Returns:
            None
        """
        data = {}
        if self._init_done:
            # save settings
            data["access"] = VaemAccess.WRITE.value
            data["dataType"] = VaemDataType.UINT32.value
            data["paramIndex"] = VaemIndex.SAVEPARAMETERS.value
            data["paramSubIndex"] = 0
            data["errorRet"] = 0
            data["transferValue"] = 99999
            frame = self._construct_frame(data)
            self._transfer(frame)
        else:
            logger.warning("No VAEM Connected!!")

    def select_valve(self, valve_id: int) -> None:
        """
        Selects one valve in the VAEM.

        According to VAEM Logic all selected valves can be opened,
        others cannot with open command

        Typical usage example:
            valve_id = 1

            vaem.select_valve(valve_id = valve_id)

        Args:
            valve_id (int): The id of the valve to select

        Returns:
            None

        Raises:
            ValueError: Valve index out of bounds
        """
        if self._init_done:
            if valve_id in range(1, 9):
                # get currently selected valves
                data = self._get_transfer_value(
                    VaemAccess.READ.value,
                    VaemIndex.SELECTVALVE,
                    vaemValveIndex[valve_id],
                )
                frame = self._construct_frame(data)
                resp = self._transfer(frame)
                # select new valve
                data = self._get_transfer_value(
                    VaemAccess.WRITE.value,
                    VaemIndex.SELECTVALVE,
                    vaemValveIndex[valve_id] | self._deconstruct_frame(resp)["transferValue"],
                )
                frame = self._construct_frame(data)
                self._transfer(frame)
                self.active_valves[valve_id - 1] = 1
            else:
                logger.error("Valve ID's have a range of 1-8, Inputted : %s", valve_id)
                raise ValueError(f"Valve index out of bounds: {valve_id}")
        else:
            logger.warning("No VAEM Connected!!")

    def deselect_valve(self, valve_id: int) -> None:
        """
        Deselects one valve in the VAEM.

        According to VAEM Logic all selected valves can be opened,
        others cannot with open command

        Typical usage example:
            for _ in range (1, 9):

                vaem.deselect_valve(_)

        Args:
            valve_id (int): The ID of the valve to select. Valid numbers are from 1 to 8

        Returns:
            None

        Raises:
            ValueError: Valve index out of bounds
        """
        if self._init_done:
            if valve_id in range(1, 9):
                # get currently selected valves
                data = self._get_transfer_value(
                    VaemAccess.READ.value,
                    VaemIndex.SELECTVALVE,
                    vaemValveIndex[valve_id],
                )
                frame = self._construct_frame(data)
                resp = self._transfer(frame)
                # deselect new valve
                data = self._get_transfer_value(
                    VaemAccess.WRITE.value,
                    VaemIndex.SELECTVALVE,
                    self._deconstruct_frame(resp)["transferValue"] & (~(vaemValveIndex[valve_id])),
                )
                frame = self._construct_frame(data)
                self._transfer(frame)
                self.active_valves[valve_id - 1] = 0
            else:
                logger.error("Valve ID's have a range of 1-8, Inputted : %s", valve_id)
                raise ValueError(f"Valve index out of bounds: {valve_id}")
        else:
            logger.warning("No VAEM Connected!!")

    def set_valve_switching_time(self, valve_id: int, opening_time: int) -> None:
        """
        Sets the switching time for the specified valve.

        Typical usage example:
            valve_id = 1

            opening_time = 100

            vaem.set_valve_switching_time(valve_id = valve_id, opening_time = opening_time)

        Args:
            valve_id (int): ID number of the valve for configuration
            opening_time (int): Time in milliseconds of which the Valve with the ID will be opened

        Returns:
            None

        Raises:
            ValueError: Valve index out of bounds
        """
        if self._init_done:
            opening_time = int(opening_time / 0.2)
            if (opening_time in range(0, 9999999999999)) and (valve_id in range(1, 9)):
                data = self._get_transfer_value(
                    VaemAccess.WRITE.value,
                    VaemIndex.SWITCHINGTIME,
                    (valve_id - 1),
                    int(opening_time),
                )
                frame = self._construct_frame(data)
                self._transfer(frame)
            else:
                logger.error("Valve ID's have a range of 1-8, Inputted : %s", valve_id)
                raise ValueError
        else:
            logger.warning("No VAEM Connected!!")

    def open_selected_valves(self) -> None:
        """
        Opens all valves that are selected.

        Typical usage example:
            vaem.open_selected_valves()

        Args:
            None
        Returns:
            None
        """
        if self._init_done:
            # save settings
            if self.error_handling_enabled:
                data = self._get_transfer_value(
                    VaemAccess.WRITE.value,
                    VaemIndex.CONTROLWORD,
                    0,
                    VaemControlWords.STOPVALVES.value,
                )
                frame = self._construct_frame(data)
                self._transfer(frame)
                data = self._get_transfer_value(
                    VaemAccess.WRITE.value,
                    VaemIndex.CONTROLWORD,
                    0,
                    VaemControlWords.STARTVALVESRESETERROR.value,
                )
                frame = self._construct_frame(data)
                self._transfer(frame)
            else:
                data = self._get_transfer_value(
                    VaemAccess.WRITE.value,
                    VaemIndex.CONTROLWORD,
                    0,
                    VaemControlWords.STARTVALVESRESETERROR.value,
                )
                frame = self._construct_frame(data)
                self._transfer(frame)

            # reset the control word
            self.clear_error()
        else:
            logger.warning("No VAEM Connected!!")

    def open_valves(self, timings: dict[int, int]) -> None:
        """
        Selects and opens valves with specified actuation times.

        Typical usage example:
            valve_opening_times = {1: 100,
                                2: 100,
                                3: 100,
                                4: 100,
                                5: 100,
                                6: 100,
                                7: 100,
                                8: 100,
                                }

            vaem.open_valves(timings = valve_opening_times)

        Args:
            timings (dict): Dictionary of valve indices and actuation times

        Returns:
            None
        """
        for key, value in timings.items():
            self.select_valve(valve_id=key)
            self.set_valve_switching_time(valve_id=key, opening_time=value)
        self.open_selected_valves()

    def close_valves(self) -> None:
        """
        Closes valves that were previously selected.

        Typical usage example:
            vaem.close_valves()

        Args:
            None

        Returns:
            None
        """
        if self._init_done:
            # save settings
            data = self._get_transfer_value(
                VaemAccess.WRITE.value,
                VaemIndex.CONTROLWORD,
                0,
                VaemControlWords.STOPVALVES.value,
            )
            frame = self._construct_frame(data)
            self._transfer(frame)
            self.clear_error()
        else:
            logger.warning("No VAEM Connected!!")

    def get_status(self) -> dict:
        """
        Read the status of the VAEM.

        The status is return as a dictionary with the following keys:

        -> status: 1 if more than 1 valve is active

        -> error: 1 if error in valves is present

        Typical usage example:
            status = vaem.get_status()

            print(status)

        Args:
            None

        Returns:
            Dictionary of the status for the device. For more information, please refer to the VAEM Operation Instruction manual.
        """
        if self._init_done:
            data = self._get_transfer_value(
                VaemAccess.READ.value,
                VaemIndex.STATUSWORD,
                0,
                0,
            )
            frame = self._construct_frame(data)
            resp = self._transfer(frame)
            logger.info(self._get_status(self._deconstruct_frame(resp)["transferValue"]))
            return self._get_status(self._deconstruct_frame(resp)["transferValue"])
        logger.warning("No VAEM Connected!!")
        return {}

    def clear_error(self) -> None:
        """
        If any error occurs in valve opening, must be cleared with this opperation.

        Typical usage example:
            vaem.clear_error()

        Args:
            None

        Returns:
            None
        """
        if self._init_done:
            data = self._get_transfer_value(
                VaemAccess.WRITE.value,
                VaemIndex.CONTROLWORD,
                0,
                VaemControlWords.RESETERRORS.value,
            )
            frame = self._construct_frame(data)
            self._transfer(frame)
        else:
            logger.warning("No VAEM Connected!!")

    def set_inrush_current(self, valve_id: int, inrush_current: int) -> None:
        """
        Changes the inrush current for the valves based on valve ID.

        Typical usage example:
            valve_id = 1

            inrush_current_ma = 100

            vaem.set_inrush_current(valve_id = valve_id, inrush_current = inrush_current_ma)

        Args:
            valve_id (int): Target valve for selection
            inrush_current (int): In mA the new inrush current for the valve

        Returns:
            None

        Raises:
            ValueError: Valve index out of bounds
            ValueError: Input value for current not in range 20 - 1000 mA
        """
        if self._init_done:
            if valve_id not in range(1, 9):
                raise ValueError(f"Error, input valve ID was: {valve_id}, IDs range from 1-8")
            if inrush_current not in range(20, 1001):
                raise ValueError(
                    f"Error, input for inrush current was: {inrush_current}, inrush current ranges from 20, 1000 mA"
                )
            data = self._get_transfer_value(
                VaemAccess.WRITE.value,
                VaemIndex.INRUSHCURRENT,
                (valve_id - 1),
                int(inrush_current),
            )
            frame = self._construct_frame(data)
            self._transfer(frame)

    def get_inrush_current(self, valve_id: int) -> int | None:
        """
        Gets the Inrush Current for the selected Valve ID.

        Typical usage example:
            valve_id = 1

            inrush_current_ma = get_inrush_current(valve_id = valve_id)

            print(inrush_current_ma)

        Args:
            valve_id (int): Valve ID (1-8)

        Returns:
            Inrush current for valve ID in mA

        Raises:
            ValueError: Valve index out of bounds
        """
        if self._init_done:
            if valve_id not in range(1, 9):
                raise ValueError(f"Error, input valve ID was: {valve_id}, IDs range from 1-8")
            data = self._get_transfer_value(
                VaemAccess.READ.value,
                VaemIndex.INRUSHCURRENT,
                (valve_id - 1),
                0,
            )
            frame = self._construct_frame(data)
            resp = self._transfer(frame)
            return self._deconstruct_frame(resp)["transferValue"]
        return None

    def set_nominal_voltage(self, valve_id: int, voltage: int) -> None:
        """
        Sets the nominal voltage on the valve ID specified.

        Typical usage example:
            valve_id = 1

            voltage_mv = 10000

            vaem.set_nominal_voltage(valve_id = valve_id, voltage = voltage_mv)

        Args:
            valve_id (int): ID number of valve for setting (1-8)
            voltage (int): Voltage to be set in mV (8000-24000)

        Returns:
            None

        Raises:
            ValueError: Valve index out of bounds
            ValueError: Input value for voltage not in range 8000 - 24000 mV
        """
        if self._init_done:
            if valve_id not in range(1, 9):
                raise ValueError(f"Error, input valve ID was: {valve_id}, IDs range from 1-8")
            if voltage not in range(8000, 24001):
                raise ValueError(f"Error, input voltage was: {voltage}, input voltage ranges from 8000-24000 mV")
            data = self._get_transfer_value(
                VaemAccess.WRITE.value,
                VaemIndex.NOMINALVOLTAGE,
                (valve_id - 1),
                voltage,
            )
            frame = self._construct_frame(data)
            self._transfer(frame)

    def get_nominal_voltage(self, valve_id: int) -> int | None:
        """
        Gets the nominal voltage for the specified valve ID.

        Typical usage example:
            valve_id = 1

            voltage_mv = get_nominal_voltage(valve_id = valve_id)

        Args:
            valve_id (int): Valve ID (1-8)

        Returns:
            Nominal voltage in mV

        Raises:
            ValueError: Valve index out of bounds
        """
        if self._init_done:
            if valve_id not in range(1, 9):
                raise ValueError(f"Error, input valve ID was: {valve_id}, IDs range from 1-8")
            data = self._get_transfer_value(
                VaemAccess.READ.value,
                VaemIndex.NOMINALVOLTAGE,
                (valve_id - 1),
                0,
            )
            frame = self._construct_frame(data)
            resp = self._transfer(frame)
            return self._deconstruct_frame(resp)["transferValue"]
        return None

    def get_valve_switching_time(self, valve_id: int) -> int | None:
        """
        Gets the switching time in ms for the specific valve ID.

        Typical usage example:
            valve_id = 1

            switching_time_ms = vaem.get_valve_switching_time(valve_id = valve_id)

            print(switching_time_ms)

        Args:
            valve_id (int): Valve ID (1-8)

        Returns:
            Switching time in ms

        Raises:
            ValueError: Valve index out of bounds
        """
        if self._init_done:
            if valve_id not in range(1, 9):
                raise ValueError(f"Error, input valve ID was: {valve_id}, IDs range from 1-8")
            data = self._get_transfer_value(
                VaemAccess.READ.value,
                VaemIndex.SWITCHINGTIME,
                (valve_id - 1),
                0,
            )
            frame = self._construct_frame(data)
            resp = self._transfer(frame)
            return int(self._deconstruct_frame(resp)["transferValue"] * 0.2)
        return None

    def get_delay_time(self, valve_id: int) -> int | None:
        """
        Gets the current delay time for the valve ID.

        Typical usage example:
            valve_id = 1

            delay_time_ms = vaem.get_delay_time(valve_id = valve_id)

            print(delay_time_ms)

        Args:
            valve_id (int): Valve ID (1-8)

        Returns:
            Delay time for the valve ID in ms

        Raises:
            ValueError: Valve index out of bounds
        """
        if self._init_done:
            if valve_id not in range(1, 9):
                raise ValueError(f"Error, input valve ID was {valve_id}, ID's range from 1-8")
            data = self._get_transfer_value(
                VaemAccess.READ.value,
                VaemIndex.TIMEDELAY,
                (valve_id - 1),
                0,
            )
            frame = self._construct_frame(data)
            resp = self._transfer(frame)
            return int(self._deconstruct_frame(resp)["transferValue"] * 0.2)
        return None

    def set_delay_time(self, valve_id: int, delay_time: int) -> None:
        """
        Sets the delay time for a specific valve ID.

        Typical usage example:
            valve_id = 1

            delay_time = 100

            vaem.set_delay_time(valve_id = valve_id, delay_time = delay_time)

        Args:
            valve_id (int): Valve ID (1-8)
            delay_time (int): Delay time to be set for the valve ID

        Returns:
            None

        Raises:
            ValueError: Valve index out of bounds
        """
        if self._init_done:
            if valve_id not in range(1, 9):
                raise ValueError(f"Error, input valve ID was {valve_id}, ID's range from 1-8")
            delay_time = int(delay_time / 0.2)
            data = self._get_transfer_value(
                VaemAccess.WRITE.value,
                VaemIndex.TIMEDELAY,
                (valve_id - 1),
                delay_time,
            )
            frame = self._construct_frame(data)
            self._transfer(frame)

    def get_pickup_time(self, valve_id: int) -> int | None:
        """
        Gets the pickup time for the selected valve ID (1-8).

        Typical usage example:
            valve_id = 1

            pickup_time = vaem.get_pickup_time(valve_id = valve_id)

            print(pickup_time)

        Args:
            valve_id (int): Valve ID 1-8

        Returns:
            Pickup time in ms

        Raises:
            ValueError: Valve index out of bounds
        """
        if self._init_done:
            if valve_id not in range(1, 9):
                raise ValueError(f"Error, input valve ID was {valve_id}, ID's range from 1-8")
            data = self._get_transfer_value(
                VaemAccess.READ.value,
                VaemIndex.PICKUPTIME,
                (valve_id - 1),
                0,
            )
            frame = self._construct_frame(data)
            resp = self._transfer(frame)
            return int(self._deconstruct_frame(resp)["transferValue"] * 0.2)
        return None

    def set_pickup_time(self, valve_id: int, pickup_time: int) -> None:
        """
        Sets the pickup time for the specified valve ID 1-8.

        Typical usage example:
            valve_id = 1

            pickup_time = 100

            vaem.set_pickup_time(valve_id = valve_id, pickup_time = pickup_time)

        Args:
            valve_id (int): ID number for valve (1-8)
            pickup_time (int): Pickup time in ms

        Returns:
            None

        Raises:
            ValueError: Valve index out of bounds
            ValueError: Input value for pickup time not in range 1 - 500 ms
        """
        if self._init_done:
            if valve_id not in range(1, 9):
                raise ValueError(f"Error, input valve ID was {valve_id}, ID's range from 1-8")
            if pickup_time not in range(1, 501):
                raise ValueError(f"Error, input pickup time was {pickup_time} ms, This is out of the range of 1-500 ms")
            pickup_time = int(pickup_time / 0.2)
            data = self._get_transfer_value(
                VaemAccess.WRITE.value,
                VaemIndex.PICKUPTIME,
                (valve_id - 1),
                pickup_time,
            )
            frame = self._construct_frame(data)
            self._transfer(frame)

    def get_holding_current(self, valve_id: int) -> int | None:
        """
        Gets the current holding current for the valve selected 1-8.

        Typical usage example:
            valve_id = 1

            holding_current = vaem.get_holding_current(valve_id = valve_id)

            print(holding_current)

        Args:
            valve_id (int): Valve ID (1-8)

        Returns:
            Holding current of valve in mA

        Raises:
            ValueError: Valve index out of bounds
        """
        if self._init_done:
            if valve_id not in range(1, 9):
                raise ValueError(f"Error, input valve ID was {valve_id}, ID's range from 1-8")
            data = self._get_transfer_value(
                VaemAccess.READ.value,
                VaemIndex.HOLDINGCURRENT,
                (valve_id - 1),
                0,
            )
            frame = self._construct_frame(data)
            resp = self._transfer(frame)
            return self._deconstruct_frame(resp)["transferValue"]
        return None

    def set_holding_current(self, valve_id: int, holding_current: int) -> None:
        """
        Sets the holding current for the valve selected 1-8.

        Typical usage example:
            valve_id = 1

            holding_current = 100

            vaem.set_holding_current(valve_id = valve_id, holding_current = holding_current)

        Args:
            valve_id (int): Valve ID (1-8)
            holding_current (int): Holding current in mA (20-400)

        Returns:
            None

        Raises:
            ValueError: Valve index out of bounds
            ValueError: Input value for holding current not in range 20 - 400 mA
        """
        if self._init_done:
            if valve_id not in range(1, 9):
                raise ValueError(f"Error, input valve ID was {valve_id}, ID's range from 1-8")
            if holding_current not in range(20, 401):
                raise ValueError(f"Error, input holding current out of range: {holding_current}")
            data = self._get_transfer_value(
                VaemAccess.WRITE.value,
                VaemIndex.HOLDINGCURRENT,
                (valve_id - 1),
                int(holding_current),
            )
            frame = self._construct_frame(data)
            self._transfer(frame)

    def get_current_reduction_time(self, valve_id: int) -> int | None:
        """
        Gets the time that the current is reduced to the holding current value for the valve selected 1-8.

        Typical usage example:
            valve_id = 1

            reduction_time = vaem.get_current_reduction_time(valve_id = valve_id)

            print(reduction_time)

        Args:
            valve_id (int): Valve ID (1-8)

        Returns:
            Current reduction time in ms

        Raises:
            ValueError: Valve index out of bounds
        """
        if self._init_done:
            if valve_id not in range(1, 9):
                raise ValueError(f"Error, input valve ID was {valve_id}, ID's range from 1-8")
            data = self._get_transfer_value(
                VaemAccess.READ.value,
                VaemIndex.CURRENTREDUCTIONTIME,
                (valve_id - 1),
                0,
            )
            frame = self._construct_frame(data)
            resp = self._transfer(frame)
            return int(self._deconstruct_frame(resp)["transferValue"] * 0.2)
        return None

    def set_current_reduction_time(self, valve_id: int, reduction_time: int) -> None:
        """
        Sets the time that the current is reduced to the holding current value for the valve selected 1-8.

        Typical usage example:
            valve_id = 1

            current_reduction_time = 100

            vaem.set_current_reduction_time(valve_id = valve_id, reduction_time = current_reduction_time)

        Args:
            valve_id (int): Valve ID (1-8)
            reduction_time (int): Desired length of time to go from inrush current to holding current in ms

        Returns:
            None

        Raises:
            ValueError: Valve index out of bounds
        """
        if self._init_done:
            if valve_id not in range(1, 9):
                raise ValueError(f"Error, input valve ID was {valve_id}, ID's range from 1-8")
            reduction_time = int(reduction_time * 5)
            data = self._get_transfer_value(
                VaemAccess.WRITE.value,
                VaemIndex.CURRENTREDUCTIONTIME,
                (valve_id - 1),
                int(reduction_time),
            )
            frame = self._construct_frame(data)
            self._transfer(frame)

    def set_error_handling(self, activate: int) -> None:
        """
        Sets the internal error handling of the vaem. Disabling this will cause the VAEM to omit certain errors.

        Typical usage example:
            turn_off_handling = 0

            vaem.set_error_handling(activate = turn_off_handling)

        Args:
            activate (int): 1 or 0. 1 activates the error handling and 0 disables error handling

        Returns:
            None

        Raises:
            ValueError: Input value for activation was not a 1 or 0
        """
        if self._init_done:
            if activate not in (0, 1):
                raise ValueError(f"Error, value inputted was {activate}, Either a 1 or 0 is accepted")
            data = self._get_transfer_value(
                VaemAccess.WRITE.value,
                VaemIndex.ERRORHANDLING,
                0,
                int(not activate),
            )
            frame = self._construct_frame(data)
            self._transfer(frame)
            self.error_handling_enabled = activate
            match activate:
                case 0:
                    logger.warning("""WARNING: Disabling error handling will cause the device to omit certain errors and
                                           certain functionalitites of the driver will be disabled """)
                case 1:
                    logger.info("""Error handling is enabled""")

    def get_error_handling_status(self) -> int | None:
        """
        Gets the current state of the internal error handling of the VAEM device.

        Typical usage example:
            error_handling_status = vaem.get_error_handling_status()

            print(error_handling_status)

        Args:
            None

        Returns:
            State of internal error handling. 1 for enabled, 0 for disabled
        """
        if self._init_done:
            data = self._get_transfer_value(
                VaemAccess.READ.value,
                VaemIndex.ERRORHANDLING,
                0,
                0,
            )
            frame = self._construct_frame(data)
            resp = self._transfer(frame)
            return int(not self._deconstruct_frame(resp)["transferValue"])
        return None

__init__(config) abstractmethod

VAEMModbusClient constructor.

Abstract base class to build out VAEM clients.

Parameters:

Name Type Description Default
config VAEMConfig

A ModbusTCP or ModbusSerial type to allow the driver to connect to the correct communication interface.

required

Returns:

Type Description

None

Source code in src/vaem/vaem_communication.py
@abstractmethod
def __init__(self, config: VAEMConfig):
    """
    VAEMModbusClient constructor.

    Abstract base class to build out VAEM clients.

    Args:
        config (VAEMConfig): A ModbusTCP or ModbusSerial
                type to allow the driver to connect to the
                correct communication interface.

    Returns:
        None
    """
    self._config = config
    self.version = None
    self._read_param = {
        "address": 0,
        "length": 0x07,
    }
    self._write_param = {
        "address": 0,
        "length": 0x07,
    }
    self._init_done = True
    self.error_handling_enabled = 1
    self.active_valves = [0, 0, 0, 0, 0, 0, 0, 0]

clear_error()

If any error occurs in valve opening, must be cleared with this opperation.

Typical usage example

vaem.clear_error()

Returns:

Type Description
None

None

Source code in src/vaem/vaem_communication.py
def clear_error(self) -> None:
    """
    If any error occurs in valve opening, must be cleared with this opperation.

    Typical usage example:
        vaem.clear_error()

    Args:
        None

    Returns:
        None
    """
    if self._init_done:
        data = self._get_transfer_value(
            VaemAccess.WRITE.value,
            VaemIndex.CONTROLWORD,
            0,
            VaemControlWords.RESETERRORS.value,
        )
        frame = self._construct_frame(data)
        self._transfer(frame)
    else:
        logger.warning("No VAEM Connected!!")

close_valves()

Closes valves that were previously selected.

Typical usage example

vaem.close_valves()

Returns:

Type Description
None

None

Source code in src/vaem/vaem_communication.py
def close_valves(self) -> None:
    """
    Closes valves that were previously selected.

    Typical usage example:
        vaem.close_valves()

    Args:
        None

    Returns:
        None
    """
    if self._init_done:
        # save settings
        data = self._get_transfer_value(
            VaemAccess.WRITE.value,
            VaemIndex.CONTROLWORD,
            0,
            VaemControlWords.STOPVALVES.value,
        )
        frame = self._construct_frame(data)
        self._transfer(frame)
        self.clear_error()
    else:
        logger.warning("No VAEM Connected!!")

deselect_valve(valve_id)

Deselects one valve in the VAEM.

According to VAEM Logic all selected valves can be opened, others cannot with open command

Typical usage example

for _ in range (1, 9):

1
vaem.deselect_valve(_)

Parameters:

Name Type Description Default
valve_id int

The ID of the valve to select. Valid numbers are from 1 to 8

required

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

Valve index out of bounds

Source code in src/vaem/vaem_communication.py
def deselect_valve(self, valve_id: int) -> None:
    """
    Deselects one valve in the VAEM.

    According to VAEM Logic all selected valves can be opened,
    others cannot with open command

    Typical usage example:
        for _ in range (1, 9):

            vaem.deselect_valve(_)

    Args:
        valve_id (int): The ID of the valve to select. Valid numbers are from 1 to 8

    Returns:
        None

    Raises:
        ValueError: Valve index out of bounds
    """
    if self._init_done:
        if valve_id in range(1, 9):
            # get currently selected valves
            data = self._get_transfer_value(
                VaemAccess.READ.value,
                VaemIndex.SELECTVALVE,
                vaemValveIndex[valve_id],
            )
            frame = self._construct_frame(data)
            resp = self._transfer(frame)
            # deselect new valve
            data = self._get_transfer_value(
                VaemAccess.WRITE.value,
                VaemIndex.SELECTVALVE,
                self._deconstruct_frame(resp)["transferValue"] & (~(vaemValveIndex[valve_id])),
            )
            frame = self._construct_frame(data)
            self._transfer(frame)
            self.active_valves[valve_id - 1] = 0
        else:
            logger.error("Valve ID's have a range of 1-8, Inputted : %s", valve_id)
            raise ValueError(f"Valve index out of bounds: {valve_id}")
    else:
        logger.warning("No VAEM Connected!!")

get_current_reduction_time(valve_id)

Gets the time that the current is reduced to the holding current value for the valve selected 1-8.

Typical usage example

valve_id = 1

reduction_time = vaem.get_current_reduction_time(valve_id = valve_id)

print(reduction_time)

Parameters:

Name Type Description Default
valve_id int

Valve ID (1-8)

required

Returns:

Type Description
int | None

Current reduction time in ms

Raises:

Type Description
ValueError

Valve index out of bounds

Source code in src/vaem/vaem_communication.py
def get_current_reduction_time(self, valve_id: int) -> int | None:
    """
    Gets the time that the current is reduced to the holding current value for the valve selected 1-8.

    Typical usage example:
        valve_id = 1

        reduction_time = vaem.get_current_reduction_time(valve_id = valve_id)

        print(reduction_time)

    Args:
        valve_id (int): Valve ID (1-8)

    Returns:
        Current reduction time in ms

    Raises:
        ValueError: Valve index out of bounds
    """
    if self._init_done:
        if valve_id not in range(1, 9):
            raise ValueError(f"Error, input valve ID was {valve_id}, ID's range from 1-8")
        data = self._get_transfer_value(
            VaemAccess.READ.value,
            VaemIndex.CURRENTREDUCTIONTIME,
            (valve_id - 1),
            0,
        )
        frame = self._construct_frame(data)
        resp = self._transfer(frame)
        return int(self._deconstruct_frame(resp)["transferValue"] * 0.2)
    return None

get_delay_time(valve_id)

Gets the current delay time for the valve ID.

Typical usage example

valve_id = 1

delay_time_ms = vaem.get_delay_time(valve_id = valve_id)

print(delay_time_ms)

Parameters:

Name Type Description Default
valve_id int

Valve ID (1-8)

required

Returns:

Type Description
int | None

Delay time for the valve ID in ms

Raises:

Type Description
ValueError

Valve index out of bounds

Source code in src/vaem/vaem_communication.py
def get_delay_time(self, valve_id: int) -> int | None:
    """
    Gets the current delay time for the valve ID.

    Typical usage example:
        valve_id = 1

        delay_time_ms = vaem.get_delay_time(valve_id = valve_id)

        print(delay_time_ms)

    Args:
        valve_id (int): Valve ID (1-8)

    Returns:
        Delay time for the valve ID in ms

    Raises:
        ValueError: Valve index out of bounds
    """
    if self._init_done:
        if valve_id not in range(1, 9):
            raise ValueError(f"Error, input valve ID was {valve_id}, ID's range from 1-8")
        data = self._get_transfer_value(
            VaemAccess.READ.value,
            VaemIndex.TIMEDELAY,
            (valve_id - 1),
            0,
        )
        frame = self._construct_frame(data)
        resp = self._transfer(frame)
        return int(self._deconstruct_frame(resp)["transferValue"] * 0.2)
    return None

get_error_handling_status()

Gets the current state of the internal error handling of the VAEM device.

Typical usage example

error_handling_status = vaem.get_error_handling_status()

print(error_handling_status)

Returns:

Type Description
int | None

State of internal error handling. 1 for enabled, 0 for disabled

Source code in src/vaem/vaem_communication.py
def get_error_handling_status(self) -> int | None:
    """
    Gets the current state of the internal error handling of the VAEM device.

    Typical usage example:
        error_handling_status = vaem.get_error_handling_status()

        print(error_handling_status)

    Args:
        None

    Returns:
        State of internal error handling. 1 for enabled, 0 for disabled
    """
    if self._init_done:
        data = self._get_transfer_value(
            VaemAccess.READ.value,
            VaemIndex.ERRORHANDLING,
            0,
            0,
        )
        frame = self._construct_frame(data)
        resp = self._transfer(frame)
        return int(not self._deconstruct_frame(resp)["transferValue"])
    return None

get_holding_current(valve_id)

Gets the current holding current for the valve selected 1-8.

Typical usage example

valve_id = 1

holding_current = vaem.get_holding_current(valve_id = valve_id)

print(holding_current)

Parameters:

Name Type Description Default
valve_id int

Valve ID (1-8)

required

Returns:

Type Description
int | None

Holding current of valve in mA

Raises:

Type Description
ValueError

Valve index out of bounds

Source code in src/vaem/vaem_communication.py
def get_holding_current(self, valve_id: int) -> int | None:
    """
    Gets the current holding current for the valve selected 1-8.

    Typical usage example:
        valve_id = 1

        holding_current = vaem.get_holding_current(valve_id = valve_id)

        print(holding_current)

    Args:
        valve_id (int): Valve ID (1-8)

    Returns:
        Holding current of valve in mA

    Raises:
        ValueError: Valve index out of bounds
    """
    if self._init_done:
        if valve_id not in range(1, 9):
            raise ValueError(f"Error, input valve ID was {valve_id}, ID's range from 1-8")
        data = self._get_transfer_value(
            VaemAccess.READ.value,
            VaemIndex.HOLDINGCURRENT,
            (valve_id - 1),
            0,
        )
        frame = self._construct_frame(data)
        resp = self._transfer(frame)
        return self._deconstruct_frame(resp)["transferValue"]
    return None

get_inrush_current(valve_id)

Gets the Inrush Current for the selected Valve ID.

Typical usage example

valve_id = 1

inrush_current_ma = get_inrush_current(valve_id = valve_id)

print(inrush_current_ma)

Parameters:

Name Type Description Default
valve_id int

Valve ID (1-8)

required

Returns:

Type Description
int | None

Inrush current for valve ID in mA

Raises:

Type Description
ValueError

Valve index out of bounds

Source code in src/vaem/vaem_communication.py
def get_inrush_current(self, valve_id: int) -> int | None:
    """
    Gets the Inrush Current for the selected Valve ID.

    Typical usage example:
        valve_id = 1

        inrush_current_ma = get_inrush_current(valve_id = valve_id)

        print(inrush_current_ma)

    Args:
        valve_id (int): Valve ID (1-8)

    Returns:
        Inrush current for valve ID in mA

    Raises:
        ValueError: Valve index out of bounds
    """
    if self._init_done:
        if valve_id not in range(1, 9):
            raise ValueError(f"Error, input valve ID was: {valve_id}, IDs range from 1-8")
        data = self._get_transfer_value(
            VaemAccess.READ.value,
            VaemIndex.INRUSHCURRENT,
            (valve_id - 1),
            0,
        )
        frame = self._construct_frame(data)
        resp = self._transfer(frame)
        return self._deconstruct_frame(resp)["transferValue"]
    return None

get_nominal_voltage(valve_id)

Gets the nominal voltage for the specified valve ID.

Typical usage example

valve_id = 1

voltage_mv = get_nominal_voltage(valve_id = valve_id)

Parameters:

Name Type Description Default
valve_id int

Valve ID (1-8)

required

Returns:

Type Description
int | None

Nominal voltage in mV

Raises:

Type Description
ValueError

Valve index out of bounds

Source code in src/vaem/vaem_communication.py
def get_nominal_voltage(self, valve_id: int) -> int | None:
    """
    Gets the nominal voltage for the specified valve ID.

    Typical usage example:
        valve_id = 1

        voltage_mv = get_nominal_voltage(valve_id = valve_id)

    Args:
        valve_id (int): Valve ID (1-8)

    Returns:
        Nominal voltage in mV

    Raises:
        ValueError: Valve index out of bounds
    """
    if self._init_done:
        if valve_id not in range(1, 9):
            raise ValueError(f"Error, input valve ID was: {valve_id}, IDs range from 1-8")
        data = self._get_transfer_value(
            VaemAccess.READ.value,
            VaemIndex.NOMINALVOLTAGE,
            (valve_id - 1),
            0,
        )
        frame = self._construct_frame(data)
        resp = self._transfer(frame)
        return self._deconstruct_frame(resp)["transferValue"]
    return None

get_pickup_time(valve_id)

Gets the pickup time for the selected valve ID (1-8).

Typical usage example

valve_id = 1

pickup_time = vaem.get_pickup_time(valve_id = valve_id)

print(pickup_time)

Parameters:

Name Type Description Default
valve_id int

Valve ID 1-8

required

Returns:

Type Description
int | None

Pickup time in ms

Raises:

Type Description
ValueError

Valve index out of bounds

Source code in src/vaem/vaem_communication.py
def get_pickup_time(self, valve_id: int) -> int | None:
    """
    Gets the pickup time for the selected valve ID (1-8).

    Typical usage example:
        valve_id = 1

        pickup_time = vaem.get_pickup_time(valve_id = valve_id)

        print(pickup_time)

    Args:
        valve_id (int): Valve ID 1-8

    Returns:
        Pickup time in ms

    Raises:
        ValueError: Valve index out of bounds
    """
    if self._init_done:
        if valve_id not in range(1, 9):
            raise ValueError(f"Error, input valve ID was {valve_id}, ID's range from 1-8")
        data = self._get_transfer_value(
            VaemAccess.READ.value,
            VaemIndex.PICKUPTIME,
            (valve_id - 1),
            0,
        )
        frame = self._construct_frame(data)
        resp = self._transfer(frame)
        return int(self._deconstruct_frame(resp)["transferValue"] * 0.2)
    return None

get_status()

Read the status of the VAEM.

The status is return as a dictionary with the following keys:

-> status: 1 if more than 1 valve is active

-> error: 1 if error in valves is present

Typical usage example

status = vaem.get_status()

print(status)

Returns:

Type Description
dict

Dictionary of the status for the device. For more information, please refer to the VAEM Operation Instruction manual.

Source code in src/vaem/vaem_communication.py
def get_status(self) -> dict:
    """
    Read the status of the VAEM.

    The status is return as a dictionary with the following keys:

    -> status: 1 if more than 1 valve is active

    -> error: 1 if error in valves is present

    Typical usage example:
        status = vaem.get_status()

        print(status)

    Args:
        None

    Returns:
        Dictionary of the status for the device. For more information, please refer to the VAEM Operation Instruction manual.
    """
    if self._init_done:
        data = self._get_transfer_value(
            VaemAccess.READ.value,
            VaemIndex.STATUSWORD,
            0,
            0,
        )
        frame = self._construct_frame(data)
        resp = self._transfer(frame)
        logger.info(self._get_status(self._deconstruct_frame(resp)["transferValue"]))
        return self._get_status(self._deconstruct_frame(resp)["transferValue"])
    logger.warning("No VAEM Connected!!")
    return {}

get_valve_switching_time(valve_id)

Gets the switching time in ms for the specific valve ID.

Typical usage example

valve_id = 1

switching_time_ms = vaem.get_valve_switching_time(valve_id = valve_id)

print(switching_time_ms)

Parameters:

Name Type Description Default
valve_id int

Valve ID (1-8)

required

Returns:

Type Description
int | None

Switching time in ms

Raises:

Type Description
ValueError

Valve index out of bounds

Source code in src/vaem/vaem_communication.py
def get_valve_switching_time(self, valve_id: int) -> int | None:
    """
    Gets the switching time in ms for the specific valve ID.

    Typical usage example:
        valve_id = 1

        switching_time_ms = vaem.get_valve_switching_time(valve_id = valve_id)

        print(switching_time_ms)

    Args:
        valve_id (int): Valve ID (1-8)

    Returns:
        Switching time in ms

    Raises:
        ValueError: Valve index out of bounds
    """
    if self._init_done:
        if valve_id not in range(1, 9):
            raise ValueError(f"Error, input valve ID was: {valve_id}, IDs range from 1-8")
        data = self._get_transfer_value(
            VaemAccess.READ.value,
            VaemIndex.SWITCHINGTIME,
            (valve_id - 1),
            0,
        )
        frame = self._construct_frame(data)
        resp = self._transfer(frame)
        return int(self._deconstruct_frame(resp)["transferValue"] * 0.2)
    return None

open_selected_valves()

Opens all valves that are selected.

Typical usage example

vaem.open_selected_valves()

Returns: None

Source code in src/vaem/vaem_communication.py
def open_selected_valves(self) -> None:
    """
    Opens all valves that are selected.

    Typical usage example:
        vaem.open_selected_valves()

    Args:
        None
    Returns:
        None
    """
    if self._init_done:
        # save settings
        if self.error_handling_enabled:
            data = self._get_transfer_value(
                VaemAccess.WRITE.value,
                VaemIndex.CONTROLWORD,
                0,
                VaemControlWords.STOPVALVES.value,
            )
            frame = self._construct_frame(data)
            self._transfer(frame)
            data = self._get_transfer_value(
                VaemAccess.WRITE.value,
                VaemIndex.CONTROLWORD,
                0,
                VaemControlWords.STARTVALVESRESETERROR.value,
            )
            frame = self._construct_frame(data)
            self._transfer(frame)
        else:
            data = self._get_transfer_value(
                VaemAccess.WRITE.value,
                VaemIndex.CONTROLWORD,
                0,
                VaemControlWords.STARTVALVESRESETERROR.value,
            )
            frame = self._construct_frame(data)
            self._transfer(frame)

        # reset the control word
        self.clear_error()
    else:
        logger.warning("No VAEM Connected!!")

open_valves(timings)

Selects and opens valves with specified actuation times.

Typical usage example

valve_opening_times = {1: 100, 2: 100, 3: 100, 4: 100, 5: 100, 6: 100, 7: 100, 8: 100, }

vaem.open_valves(timings = valve_opening_times)

Parameters:

Name Type Description Default
timings dict

Dictionary of valve indices and actuation times

required

Returns:

Type Description
None

None

Source code in src/vaem/vaem_communication.py
def open_valves(self, timings: dict[int, int]) -> None:
    """
    Selects and opens valves with specified actuation times.

    Typical usage example:
        valve_opening_times = {1: 100,
                            2: 100,
                            3: 100,
                            4: 100,
                            5: 100,
                            6: 100,
                            7: 100,
                            8: 100,
                            }

        vaem.open_valves(timings = valve_opening_times)

    Args:
        timings (dict): Dictionary of valve indices and actuation times

    Returns:
        None
    """
    for key, value in timings.items():
        self.select_valve(valve_id=key)
        self.set_valve_switching_time(valve_id=key, opening_time=value)
    self.open_selected_valves()

save_settings()

Saves all parameters to non-volatile memory.

Typical usage example

vaem.save_settings()

Returns:

Type Description
None

None

Source code in src/vaem/vaem_communication.py
def save_settings(self) -> None:
    """
    Saves all parameters to non-volatile memory.

    Typical usage example:
        vaem.save_settings()

    Args:
        None

    Returns:
        None
    """
    data = {}
    if self._init_done:
        # save settings
        data["access"] = VaemAccess.WRITE.value
        data["dataType"] = VaemDataType.UINT32.value
        data["paramIndex"] = VaemIndex.SAVEPARAMETERS.value
        data["paramSubIndex"] = 0
        data["errorRet"] = 0
        data["transferValue"] = 99999
        frame = self._construct_frame(data)
        self._transfer(frame)
    else:
        logger.warning("No VAEM Connected!!")

select_valve(valve_id)

Selects one valve in the VAEM.

According to VAEM Logic all selected valves can be opened, others cannot with open command

Typical usage example

valve_id = 1

vaem.select_valve(valve_id = valve_id)

Parameters:

Name Type Description Default
valve_id int

The id of the valve to select

required

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

Valve index out of bounds

Source code in src/vaem/vaem_communication.py
def select_valve(self, valve_id: int) -> None:
    """
    Selects one valve in the VAEM.

    According to VAEM Logic all selected valves can be opened,
    others cannot with open command

    Typical usage example:
        valve_id = 1

        vaem.select_valve(valve_id = valve_id)

    Args:
        valve_id (int): The id of the valve to select

    Returns:
        None

    Raises:
        ValueError: Valve index out of bounds
    """
    if self._init_done:
        if valve_id in range(1, 9):
            # get currently selected valves
            data = self._get_transfer_value(
                VaemAccess.READ.value,
                VaemIndex.SELECTVALVE,
                vaemValveIndex[valve_id],
            )
            frame = self._construct_frame(data)
            resp = self._transfer(frame)
            # select new valve
            data = self._get_transfer_value(
                VaemAccess.WRITE.value,
                VaemIndex.SELECTVALVE,
                vaemValveIndex[valve_id] | self._deconstruct_frame(resp)["transferValue"],
            )
            frame = self._construct_frame(data)
            self._transfer(frame)
            self.active_valves[valve_id - 1] = 1
        else:
            logger.error("Valve ID's have a range of 1-8, Inputted : %s", valve_id)
            raise ValueError(f"Valve index out of bounds: {valve_id}")
    else:
        logger.warning("No VAEM Connected!!")

set_current_reduction_time(valve_id, reduction_time)

Sets the time that the current is reduced to the holding current value for the valve selected 1-8.

Typical usage example

valve_id = 1

current_reduction_time = 100

vaem.set_current_reduction_time(valve_id = valve_id, reduction_time = current_reduction_time)

Parameters:

Name Type Description Default
valve_id int

Valve ID (1-8)

required
reduction_time int

Desired length of time to go from inrush current to holding current in ms

required

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

Valve index out of bounds

Source code in src/vaem/vaem_communication.py
def set_current_reduction_time(self, valve_id: int, reduction_time: int) -> None:
    """
    Sets the time that the current is reduced to the holding current value for the valve selected 1-8.

    Typical usage example:
        valve_id = 1

        current_reduction_time = 100

        vaem.set_current_reduction_time(valve_id = valve_id, reduction_time = current_reduction_time)

    Args:
        valve_id (int): Valve ID (1-8)
        reduction_time (int): Desired length of time to go from inrush current to holding current in ms

    Returns:
        None

    Raises:
        ValueError: Valve index out of bounds
    """
    if self._init_done:
        if valve_id not in range(1, 9):
            raise ValueError(f"Error, input valve ID was {valve_id}, ID's range from 1-8")
        reduction_time = int(reduction_time * 5)
        data = self._get_transfer_value(
            VaemAccess.WRITE.value,
            VaemIndex.CURRENTREDUCTIONTIME,
            (valve_id - 1),
            int(reduction_time),
        )
        frame = self._construct_frame(data)
        self._transfer(frame)

set_delay_time(valve_id, delay_time)

Sets the delay time for a specific valve ID.

Typical usage example

valve_id = 1

delay_time = 100

vaem.set_delay_time(valve_id = valve_id, delay_time = delay_time)

Parameters:

Name Type Description Default
valve_id int

Valve ID (1-8)

required
delay_time int

Delay time to be set for the valve ID

required

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

Valve index out of bounds

Source code in src/vaem/vaem_communication.py
def set_delay_time(self, valve_id: int, delay_time: int) -> None:
    """
    Sets the delay time for a specific valve ID.

    Typical usage example:
        valve_id = 1

        delay_time = 100

        vaem.set_delay_time(valve_id = valve_id, delay_time = delay_time)

    Args:
        valve_id (int): Valve ID (1-8)
        delay_time (int): Delay time to be set for the valve ID

    Returns:
        None

    Raises:
        ValueError: Valve index out of bounds
    """
    if self._init_done:
        if valve_id not in range(1, 9):
            raise ValueError(f"Error, input valve ID was {valve_id}, ID's range from 1-8")
        delay_time = int(delay_time / 0.2)
        data = self._get_transfer_value(
            VaemAccess.WRITE.value,
            VaemIndex.TIMEDELAY,
            (valve_id - 1),
            delay_time,
        )
        frame = self._construct_frame(data)
        self._transfer(frame)

set_error_handling(activate)

Sets the internal error handling of the vaem. Disabling this will cause the VAEM to omit certain errors.

Typical usage example

turn_off_handling = 0

vaem.set_error_handling(activate = turn_off_handling)

Parameters:

Name Type Description Default
activate int

1 or 0. 1 activates the error handling and 0 disables error handling

required

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

Input value for activation was not a 1 or 0

Source code in src/vaem/vaem_communication.py
def set_error_handling(self, activate: int) -> None:
    """
    Sets the internal error handling of the vaem. Disabling this will cause the VAEM to omit certain errors.

    Typical usage example:
        turn_off_handling = 0

        vaem.set_error_handling(activate = turn_off_handling)

    Args:
        activate (int): 1 or 0. 1 activates the error handling and 0 disables error handling

    Returns:
        None

    Raises:
        ValueError: Input value for activation was not a 1 or 0
    """
    if self._init_done:
        if activate not in (0, 1):
            raise ValueError(f"Error, value inputted was {activate}, Either a 1 or 0 is accepted")
        data = self._get_transfer_value(
            VaemAccess.WRITE.value,
            VaemIndex.ERRORHANDLING,
            0,
            int(not activate),
        )
        frame = self._construct_frame(data)
        self._transfer(frame)
        self.error_handling_enabled = activate
        match activate:
            case 0:
                logger.warning("""WARNING: Disabling error handling will cause the device to omit certain errors and
                                       certain functionalitites of the driver will be disabled """)
            case 1:
                logger.info("""Error handling is enabled""")

set_holding_current(valve_id, holding_current)

Sets the holding current for the valve selected 1-8.

Typical usage example

valve_id = 1

holding_current = 100

vaem.set_holding_current(valve_id = valve_id, holding_current = holding_current)

Parameters:

Name Type Description Default
valve_id int

Valve ID (1-8)

required
holding_current int

Holding current in mA (20-400)

required

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

Valve index out of bounds

ValueError

Input value for holding current not in range 20 - 400 mA

Source code in src/vaem/vaem_communication.py
def set_holding_current(self, valve_id: int, holding_current: int) -> None:
    """
    Sets the holding current for the valve selected 1-8.

    Typical usage example:
        valve_id = 1

        holding_current = 100

        vaem.set_holding_current(valve_id = valve_id, holding_current = holding_current)

    Args:
        valve_id (int): Valve ID (1-8)
        holding_current (int): Holding current in mA (20-400)

    Returns:
        None

    Raises:
        ValueError: Valve index out of bounds
        ValueError: Input value for holding current not in range 20 - 400 mA
    """
    if self._init_done:
        if valve_id not in range(1, 9):
            raise ValueError(f"Error, input valve ID was {valve_id}, ID's range from 1-8")
        if holding_current not in range(20, 401):
            raise ValueError(f"Error, input holding current out of range: {holding_current}")
        data = self._get_transfer_value(
            VaemAccess.WRITE.value,
            VaemIndex.HOLDINGCURRENT,
            (valve_id - 1),
            int(holding_current),
        )
        frame = self._construct_frame(data)
        self._transfer(frame)

set_inrush_current(valve_id, inrush_current)

Changes the inrush current for the valves based on valve ID.

Typical usage example

valve_id = 1

inrush_current_ma = 100

vaem.set_inrush_current(valve_id = valve_id, inrush_current = inrush_current_ma)

Parameters:

Name Type Description Default
valve_id int

Target valve for selection

required
inrush_current int

In mA the new inrush current for the valve

required

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

Valve index out of bounds

ValueError

Input value for current not in range 20 - 1000 mA

Source code in src/vaem/vaem_communication.py
def set_inrush_current(self, valve_id: int, inrush_current: int) -> None:
    """
    Changes the inrush current for the valves based on valve ID.

    Typical usage example:
        valve_id = 1

        inrush_current_ma = 100

        vaem.set_inrush_current(valve_id = valve_id, inrush_current = inrush_current_ma)

    Args:
        valve_id (int): Target valve for selection
        inrush_current (int): In mA the new inrush current for the valve

    Returns:
        None

    Raises:
        ValueError: Valve index out of bounds
        ValueError: Input value for current not in range 20 - 1000 mA
    """
    if self._init_done:
        if valve_id not in range(1, 9):
            raise ValueError(f"Error, input valve ID was: {valve_id}, IDs range from 1-8")
        if inrush_current not in range(20, 1001):
            raise ValueError(
                f"Error, input for inrush current was: {inrush_current}, inrush current ranges from 20, 1000 mA"
            )
        data = self._get_transfer_value(
            VaemAccess.WRITE.value,
            VaemIndex.INRUSHCURRENT,
            (valve_id - 1),
            int(inrush_current),
        )
        frame = self._construct_frame(data)
        self._transfer(frame)

set_nominal_voltage(valve_id, voltage)

Sets the nominal voltage on the valve ID specified.

Typical usage example

valve_id = 1

voltage_mv = 10000

vaem.set_nominal_voltage(valve_id = valve_id, voltage = voltage_mv)

Parameters:

Name Type Description Default
valve_id int

ID number of valve for setting (1-8)

required
voltage int

Voltage to be set in mV (8000-24000)

required

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

Valve index out of bounds

ValueError

Input value for voltage not in range 8000 - 24000 mV

Source code in src/vaem/vaem_communication.py
def set_nominal_voltage(self, valve_id: int, voltage: int) -> None:
    """
    Sets the nominal voltage on the valve ID specified.

    Typical usage example:
        valve_id = 1

        voltage_mv = 10000

        vaem.set_nominal_voltage(valve_id = valve_id, voltage = voltage_mv)

    Args:
        valve_id (int): ID number of valve for setting (1-8)
        voltage (int): Voltage to be set in mV (8000-24000)

    Returns:
        None

    Raises:
        ValueError: Valve index out of bounds
        ValueError: Input value for voltage not in range 8000 - 24000 mV
    """
    if self._init_done:
        if valve_id not in range(1, 9):
            raise ValueError(f"Error, input valve ID was: {valve_id}, IDs range from 1-8")
        if voltage not in range(8000, 24001):
            raise ValueError(f"Error, input voltage was: {voltage}, input voltage ranges from 8000-24000 mV")
        data = self._get_transfer_value(
            VaemAccess.WRITE.value,
            VaemIndex.NOMINALVOLTAGE,
            (valve_id - 1),
            voltage,
        )
        frame = self._construct_frame(data)
        self._transfer(frame)

set_pickup_time(valve_id, pickup_time)

Sets the pickup time for the specified valve ID 1-8.

Typical usage example

valve_id = 1

pickup_time = 100

vaem.set_pickup_time(valve_id = valve_id, pickup_time = pickup_time)

Parameters:

Name Type Description Default
valve_id int

ID number for valve (1-8)

required
pickup_time int

Pickup time in ms

required

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

Valve index out of bounds

ValueError

Input value for pickup time not in range 1 - 500 ms

Source code in src/vaem/vaem_communication.py
def set_pickup_time(self, valve_id: int, pickup_time: int) -> None:
    """
    Sets the pickup time for the specified valve ID 1-8.

    Typical usage example:
        valve_id = 1

        pickup_time = 100

        vaem.set_pickup_time(valve_id = valve_id, pickup_time = pickup_time)

    Args:
        valve_id (int): ID number for valve (1-8)
        pickup_time (int): Pickup time in ms

    Returns:
        None

    Raises:
        ValueError: Valve index out of bounds
        ValueError: Input value for pickup time not in range 1 - 500 ms
    """
    if self._init_done:
        if valve_id not in range(1, 9):
            raise ValueError(f"Error, input valve ID was {valve_id}, ID's range from 1-8")
        if pickup_time not in range(1, 501):
            raise ValueError(f"Error, input pickup time was {pickup_time} ms, This is out of the range of 1-500 ms")
        pickup_time = int(pickup_time / 0.2)
        data = self._get_transfer_value(
            VaemAccess.WRITE.value,
            VaemIndex.PICKUPTIME,
            (valve_id - 1),
            pickup_time,
        )
        frame = self._construct_frame(data)
        self._transfer(frame)

set_valve_switching_time(valve_id, opening_time)

Sets the switching time for the specified valve.

Typical usage example

valve_id = 1

opening_time = 100

vaem.set_valve_switching_time(valve_id = valve_id, opening_time = opening_time)

Parameters:

Name Type Description Default
valve_id int

ID number of the valve for configuration

required
opening_time int

Time in milliseconds of which the Valve with the ID will be opened

required

Returns:

Type Description
None

None

Raises:

Type Description
ValueError

Valve index out of bounds

Source code in src/vaem/vaem_communication.py
def set_valve_switching_time(self, valve_id: int, opening_time: int) -> None:
    """
    Sets the switching time for the specified valve.

    Typical usage example:
        valve_id = 1

        opening_time = 100

        vaem.set_valve_switching_time(valve_id = valve_id, opening_time = opening_time)

    Args:
        valve_id (int): ID number of the valve for configuration
        opening_time (int): Time in milliseconds of which the Valve with the ID will be opened

    Returns:
        None

    Raises:
        ValueError: Valve index out of bounds
    """
    if self._init_done:
        opening_time = int(opening_time / 0.2)
        if (opening_time in range(0, 9999999999999)) and (valve_id in range(1, 9)):
            data = self._get_transfer_value(
                VaemAccess.WRITE.value,
                VaemIndex.SWITCHINGTIME,
                (valve_id - 1),
                int(opening_time),
            )
            frame = self._construct_frame(data)
            self._transfer(frame)
        else:
            logger.error("Valve ID's have a range of 1-8, Inputted : %s", valve_id)
            raise ValueError
    else:
        logger.warning("No VAEM Connected!!")

VAEMModbusSerial

Bases: VAEMModbusClient

Class used as the interface backend for using Serial communication.

Source code in src/vaem/vaem_communication.py
class VAEMModbusSerial(VAEMModbusClient):
    """Class used as the interface backend for using Serial communication."""

    def __init__(self, config: VAEMSerialConfig):
        """
        VAEMModbusSerial Constructor.

        Args:
            config (VAEMSerialConfig): A configuration class designated for ModbusSerial

        Returns:
            None

        Raises:
            NotImplementedError: Interface currently in development.
            TypeError: Config does not match serial interface specs.
            RuntimeError: A runtime error with the serial interface has occurred.
        """
        super().__init__(config)
        logger.error(
            """Modbus Serial backend is currently an experimental feature. \
            Attempting operation with this feature may result in unexpected or incorrect behavior. \
            This will be available as a fully supported feature in future releases."""
        )
        raise NotImplementedError(
            "Modbus Serial backend is not yet implemented. This will be available in future releases."
        )
        if not isinstance(config, VAEMSerialConfig):
            config_type = type(config)
            raise TypeError(
                f"""Error: Config does not match the ModbusSerial backend.
                            The type passed in was: {config_type}"""
            )
        try:
            self._config = config
            self.client = ModbusSerialClient(port=self._config.com_port, baudrate=self._config.baudrate)
        except RuntimeError as run_err:
            logger.error("Runtime error: %s. ", str(run_err))

__init__(config)

VAEMModbusSerial Constructor.

Parameters:

Name Type Description Default
config VAEMSerialConfig

A configuration class designated for ModbusSerial

required

Returns:

Type Description

None

Raises:

Type Description
NotImplementedError

Interface currently in development.

TypeError

Config does not match serial interface specs.

RuntimeError

A runtime error with the serial interface has occurred.

Source code in src/vaem/vaem_communication.py
def __init__(self, config: VAEMSerialConfig):
    """
    VAEMModbusSerial Constructor.

    Args:
        config (VAEMSerialConfig): A configuration class designated for ModbusSerial

    Returns:
        None

    Raises:
        NotImplementedError: Interface currently in development.
        TypeError: Config does not match serial interface specs.
        RuntimeError: A runtime error with the serial interface has occurred.
    """
    super().__init__(config)
    logger.error(
        """Modbus Serial backend is currently an experimental feature. \
        Attempting operation with this feature may result in unexpected or incorrect behavior. \
        This will be available as a fully supported feature in future releases."""
    )
    raise NotImplementedError(
        "Modbus Serial backend is not yet implemented. This will be available in future releases."
    )
    if not isinstance(config, VAEMSerialConfig):
        config_type = type(config)
        raise TypeError(
            f"""Error: Config does not match the ModbusSerial backend.
                        The type passed in was: {config_type}"""
        )
    try:
        self._config = config
        self.client = ModbusSerialClient(port=self._config.com_port, baudrate=self._config.baudrate)
    except RuntimeError as run_err:
        logger.error("Runtime error: %s. ", str(run_err))

VAEMModbusTCP

Bases: VAEMModbusClient

VAEM Modbus TCP client class.

Source code in src/vaem/vaem_communication.py
class VAEMModbusTCP(VAEMModbusClient):
    """VAEM Modbus TCP client class."""

    client: ModbusTcpClient

    def __init__(self, config: VAEMTCPConfig):
        """
        Contstructor.

        Args:
            config (VAEMTCPConfig): A configuration class designated for ModbusTCP.

        Returns:
            None

        Raises:
            TypeError: Incorrect ModbusTCP config passed in
            ConnectionError: Connection error with device
            ModbusIOException: Error with Modbus connection
        """
        super().__init__(config)
        if not isinstance(config, VAEMTCPConfig):
            config_type = type(config)
            raise TypeError(
                f"""Error: Config does not match the ModbusTCP backend
                The type passed in was: {config_type}"""
            )
        try:
            self._config = config
            self.client = ModbusTcpClient(host=self._config.ip, port=self._config.port)
            self.client.connect()
            self._vaem_init()
            self.version = None
            self._read_param = {
                "address": 0,
                "length": 0x07,
            }
            self._write_param = {
                "address": 0,
                "length": 0x07,
            }
            self.active_valves = [0, 0, 0, 0, 0, 0, 0, 0]
        except ConnectionError as e:
            logger.error("Connection error: %s. ", str(e))
        except ModbusIOException as io_error:
            logger.error("Modbus IO error: %s. ", str(io_error))
            logger.info(self._config)

__init__(config)

Contstructor.

Parameters:

Name Type Description Default
config VAEMTCPConfig

A configuration class designated for ModbusTCP.

required

Returns:

Type Description

None

Raises:

Type Description
TypeError

Incorrect ModbusTCP config passed in

ConnectionError

Connection error with device

ModbusIOException

Error with Modbus connection

Source code in src/vaem/vaem_communication.py
def __init__(self, config: VAEMTCPConfig):
    """
    Contstructor.

    Args:
        config (VAEMTCPConfig): A configuration class designated for ModbusTCP.

    Returns:
        None

    Raises:
        TypeError: Incorrect ModbusTCP config passed in
        ConnectionError: Connection error with device
        ModbusIOException: Error with Modbus connection
    """
    super().__init__(config)
    if not isinstance(config, VAEMTCPConfig):
        config_type = type(config)
        raise TypeError(
            f"""Error: Config does not match the ModbusTCP backend
            The type passed in was: {config_type}"""
        )
    try:
        self._config = config
        self.client = ModbusTcpClient(host=self._config.ip, port=self._config.port)
        self.client.connect()
        self._vaem_init()
        self.version = None
        self._read_param = {
            "address": 0,
            "length": 0x07,
        }
        self._write_param = {
            "address": 0,
            "length": 0x07,
        }
        self.active_valves = [0, 0, 0, 0, 0, 0, 0, 0]
    except ConnectionError as e:
        logger.error("Connection error: %s. ", str(e))
    except ModbusIOException as io_error:
        logger.error("Modbus IO error: %s. ", str(io_error))
        logger.info(self._config)