Skip to content

extension

grottext

grottext(conf: FakeConf, data: str, jsonmsg: str) -> int

Allow pushing to HA MQTT bus, with auto discovery

Use the plugin interface of grott to grab the raw JSON and push it to HA MQTT bus.

How does it work
  • Parse the raw json
  • Discard if it's a buffered message
  • Cleanup the values field (spaces, invalid characters)
  • Check if the device is already configured
  • If not, create a new sensor for each value
  • Push the values to the topic homeassistant/grott/{device}/state

Parameters:

  • conf (FakeConf) –

    The configuration object from grott

  • data (str) –

    The raw data

  • jsonmsg (str) –

    The raw json message

Source code in src/grottext/ha/extension.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def grottext(conf: FakeConf, data: str, jsonmsg: str) -> int:
    """Allow pushing to HA MQTT bus, with auto discovery

    Use the plugin interface of grott to grab the raw JSON and push it to HA MQTT bus.

    How does it work:
        - Parse the raw json
        - Discard if it's a buffered message
        - Cleanup the values field (spaces, invalid characters)
        - Check if the device is already configured
        - If not, create a new sensor for each value
        - Push the values to the topic `homeassistant/grott/{device}/state`

    Parameters:
        conf: The configuration object from grott
        data: The raw data
        jsonmsg: The raw json message
    """

    required_params = [
        MQTT_HOST_CONF_KEY,
        MQTT_PORT_CONF_KEY,
    ]
    if not all(param in conf.extvar for param in required_params):
        print("Missing configuration for ha_mqtt")
        return 1

    # Need to decode the json string
    payload = cast(Dict[str, Any], json.loads(jsonmsg))

    if payload.get("buffered") == "yes":
        # Skip buffered message, HA don't support them
        if conf.verbose:
            print("\t - Grott HA - skipped buffered")
        return 5

    device_serial = payload["device"]
    values = cleanup_mqtt_values_field(payload["values"])

    # Send the last push in UTC with TZ
    dt = datetime.now(timezone.utc)
    # Add a new value to the existing values
    values["grott_last_push"] = dt.isoformat()

    # Layout can be undefined
    if not __pv_config.get(device_serial, False) and getattr(conf, "layout", None):
        configs_payloads = []
        print(f"\tGrott HA {__version__} - creating {device_serial} config in HA, {len(values.keys())} to push")
        for key in values:
            # Prevent creating invalid MQTT topics
            if not is_valid_mqtt_topic(key):
                if conf.verbose:
                    print(f"\t[Grott HA] {__version__} skipped key: {key} invalid name")
                continue
            # Generate a configuration payload
            try:
                payload = make_payload(conf, device_serial, key)
            except AttributeError as e:
                print(f"\t[Grott HA] {__version__} error while generating key: {key}, error: {e}")
            if not payload:
                print(f"\t[Grott HA] {__version__} skipped key: {key}")
                continue

            try:
                topic = CONFIG_TOPIC.format(
                    sensor_type="sensor",
                    device=device_serial,
                    attribut=key,
                )
                configs_payloads.append(
                    {
                        "topic": topic,
                        "payload": json.dumps(payload),
                        "retain": True,
                        "qos": 1,
                    }
                )
            except Exception as e:
                print(f"\t - [grott HA] {__version__} Exception while creating new sensor {key}: {e}")
                return 6

        # Create a virtual last_push key to allow tracking when there was the last data transmission

        try:
            key = "grott_last_push"
            payload = make_payload(conf, device_serial, key)
            topic = CONFIG_TOPIC.format(
                sensor_type="sensor",
                device=device_serial,
                attribut=key,
            )
            configs_payloads.append(
                {
                    "topic": topic,
                    "payload": json.dumps(payload),
                    "retain": True,
                    "qos": 1,
                }
            )
        except Exception as e:
            print(f"\t - [grott HA] {__version__} Exception while creating new sensor last push: {e}")
            return 4
        print(f"\tPushing {len(configs_payloads)} configurations payload to HA")
        publish_multiple(conf, configs_payloads)
        print("\tConfigurations pushed")
        # Now it's configured, no need to come back
        __pv_config[device_serial] = True

    if not __pv_config.get(device_serial, False):
        print(f"\t[Grott HA] {__version__} Can't configure device: {device_serial}")
        return 7

    # Push the values to the topic
    retain = conf.extvar.get(MQTT_RETAIN_CONF_KEY, False)
    if not isinstance(retain, bool):
        retain = False
    try:
        publish_single(
            conf,
            STATE_TOPIC.format(device=device_serial),
            json.dumps(values),
            retain=retain,
        )
    except Exception as e:
        print(f"[HA ext] - Exception while publishing - {e}")
        # Reset connection state in case of a problem
        if conf.verbose:
            traceback.print_exc()
        return 2
    return 0