JSON Message Parser

JSON Message Parser

Overview

This datasource parser allows you to subscribe to an MQTT topic and ingest data in a JSON format, which may include a JSON value containing CSV as a string or an array of strings. The parser is designed for ingesting data from a LoRaWAN server but can be configured for other cases.

Platform and Stream Naming Configuration Capabilities

  • All platform and stream ids are prefixed with dsapi-<ds_friendly_id>-
  • The platform name is the same as the platform id but without the prefix dsapi-<ds_friendly_id>-
  • Platform and stream ids are configurable. The infix is constructed using static content.
  • Stream ids are prefixed with the platform id, given an infix defined by the csv_message_types, and given the configured header label as a suffix.
  • Non-CSV stream infixes can be configured via the JSONata query (not recommended to unnecessarily append text in the query).
  • The topic name is not used in this parser.

JSONata Queries

JSONata is a lightweight query and transformation language for JSON data that allows custom queries for your packet schema. Learn more

  • Senaps parsers may use lists of queries to maintain compatibility with different schema versions.
  • The first JSONata query in a list to get a non-null result is used.
  • JSONata allows lookup of JSON nodes and simple manipulation such as substring or mathematical scaling.
  • Test queries on messages at JSONata Exerciser, keeping in mind the limitations of JSONata4Java.

Parser Configuration

platform_id_infix (Optional)

  • Used as part of the platform, stream, and group ids to help you search for them
  • Default is "json"
  • Naming example for a stream: dsapi-<ds_friendly_id>-<platform_id_infix>.<device_id>.phenomenon

platform_id_suffix_queries

  • A list of JSONata queries to find a device_id in the naming example above.
  • The first matching query will be used.

lorawan_channel_plan_queries (Optional)

Useful for measuring performance of adaptive data rate algorithms.

  • label (Optional)
    • The data rate stream id suffix
    • Default is "LORAWAN.data_rate"
  • channel_plan
    • Valid values may be AS923
    • Other channel plans may be available in the future
  • bandwidth_queries
    • A list of JSONata queries to find the value of the bandwidth
  • spreading_factor_queries
    • A list of JSONata queries to find the value of the spreading factor
  • sample_period (Optional)
    • Typical time duration between samples, formatted as ISO 8601
    • Default is null
  • reporting_period (Optional)
    • Typical time duration between messages uploading a batch of samples, formatted as ISO 8601
    • Default is null
  • stream_metadata (Optional)
    • The data rate stream metadata
    • Default is null

timestamp_queries (Optional with warning)

A list of timestamp queries used to extract and decode a timestamp that will accompany data extracted from JSON values (not CSV data).

  • Only necessary if you are persisting data from JSON values.
  • The first successful timestamp extraction will be used.
  • If no timestamp can be extracted, the time of arrival at Senaps is used with a warning message logged.

query

  • WARNING: Leaving blank will result in using time of arrival at Senaps. This is not recommended because if old messages are replayed such as after a system outage where queued messages are retried, the timestamp will be incorrect.
  • The query is in the format of a JSONata query.

time_format (Optional)

  • Default is ISO 8601 formats supported by dateTimeParser method
  • A DateTimeFormat pattern as defined in Joda-Time

time_zone (Optional)

  • Default is UTC
  • Canonical ID for a timezone as listed in Joda-Time
  • e.g. "Australia/Hobart", "UTC", "+1000"

csv_payload_queries (Optional)

  • A list of JSONata queries to find CSV data.
  • If not defined, no CSV data will be extracted.

json_queries (Optional)

A map where the key will become the stream suffix/label if a stream is created. The value contains JSONata queries and stream metadata for when a stream is created.

  • queries
    • A list of JSONata queries to find the integer or string value
  • sample_period (Optional)
    • Typical time duration between samples, formatted as ISO 8601
    • Default is null
  • reporting_period (Optional)
    • Typical time duration between messages uploading a batch of samples, formatted as ISO 8601
    • Default is null
  • stream_metadata (Optional)
    • The data rate stream metadata
    • Default is null

csv_message_types (Optional)

Refer to CSV#csv_message_types

Basic CSV Configuration Example

For this configuration example:

platform_id_infix

Each platform and stream will be prefixed with dsapi-<ds_friendly_id>-your_org_and_project_namespace.

platform_id_suffix_queries

The first element that successfully returns a result of the JSONata query is appended to the platform prefix derived in the previous step, i.e., dsapi-<ds_friendly_id>-your_org_and_project_namespace.your_platform_id

csv_payload_queries

The first element that successfully returns a result of the JSONata query defines where to find the CSV payload that needs to be decoded.

csv_message_types

Defines how to decode the CSV payload CSV#csv_message_types

Example Configuration

{
  "platform_id_infix": "your_org_and_project_namespace",
  "platform_id_suffix_queries": [
    "id"
  ],
  "csv_payload_queries": [
    "csv"
  ],
  "csv_message_types": {
    "sensor_x": {
      "headers": [
        {
          "label": "BattV_volts"
        },
        {
          "label": "PTemp_C"
        }
      ]
    }
  }
}

Example Payloads

Example payload 1 and 2 below are equivalent. Processing each message where the CSV data is in a single string is preferred and more optimal; however, the array option is provided as a more readable alternative.

Example payload 1

{
  "id": "your_platform_id",
  "csv": {
    "sensor_x": "2021-11-09T23:36:50,3519,8\n2021-11-10T00:03:01,3515,0\n2021-11-10T00:18:02,3509,0\n2021-11-10T22:30:00,3980,10.41\n2021-11-10T22:45:00,4008,9.76\n2021-11-10T23:15:01,4067,8.63"
  }
}

Example payload 2

{
  "id": "your_platform_id",
  "csv": {
    "sensor_x": [
      "2021-11-09T23:36:50,3519,8",
      "2021-11-10T00:03:01,3515,0",
      "2021-11-10T00:18:02,3509,0",
      "2021-11-10T22:30:00,3980,10.41",
      "2021-11-10T22:45:00,4008,9.76",
      "2021-11-10T23:15:01,4067,8.63"
    ]
  }
}

Command line tests (Bash)

export SENAPS=senaps.eratos.com && export DS_USERNAME=<TODO> && export DS_PASSWORD=<TODO> && export DS_FRIENDLY_ID=<TODO>

t=$(date -u +%Y-%m-%dT%T);mosquitto_pub -h $SENAPS -p 1883 -u "dsapi-${DS_FRIENDLY_ID}:${DS_USERNAME}" -P "${DS_PASSWORD}" -t topic_name_not_relevant -m '{ "id": "your_platform_id", "csv": { "sensor_x": "2021-11-09T23:36:50,3519,8\n2021-11-10T00:03:01,3515,0\n2021-11-10T00:18:02,3509,0\n2021-11-10T22:30:00,3980,10.41\n2021-11-10T22:45:00,4008,9.76\n'"${t}"',4067,'$((1 + $RANDOM % 10))'" }}'

t=$(date -u +%Y-%m-%dT%T);mosquitto_pub -h $SENAPS -p 1883 -u "dsapi-${DS_FRIENDLY_ID}:${DS_USERNAME}" -P "${DS_PASSWORD}" -t topic_name_not_relevant -m '{"id": "your_platform_id","csv": {"sensor_x": ["2021-11-09T23:36:50,3519,8","2021-11-10T00:03:01,3515,0","2021-11-10T00:18:02,3509,0","2021-11-10T22:30:00,3980,10.41","2021-11-10T22:45:00,4008,9.76","'$t',4067,'$((1 + $RANDOM % 10))'"]}}'

Initialisation Checks

If the parser has successfully passed configuration validation and connected to a data source, you will see the log message:

JSON Parser has started and is ready for incoming messages

Basic JSON Configuration Example

For this configuration example:

timestamp

The first element that successfully returns a result of the JSONata query will be used as the timestamp that will be saved with the timeseries values within the JSON.

Because we have not defined time_format or time_zone, any ISO 8601 formatted datetime will be parsed with the UTC timezone.

json_queries

weather.temperature

Optional query which will search for the key s1 and parse the value to create the temperature stream dsapi-<ds_friendly_id>-your_org_and_project_namespace.your_platform_id.weather.temperature

Note that the value of the search may be an int, float, or a string containing an int or float.

weather.humidity

Optional query which will search for the key s2 and parse the value to create the temperature stream dsapi-<ds_friendly_id>-your_org_and_project_namespace.your_platform_id.weather.humidity

status.battery

Optional query which will search for the key s1 and parse the value to create the temperature stream dsapi-<ds_friendly_id>-your_org_and_project_namespace.your_platform_id.status.battery

Example Configuration

{
  "platform_id_infix": "your_org_and_project_namespace",
  "platform_id_suffix_queries": [
    "id"
  ],
  "timestamp_queries": [
    {
      "query": "ts"
    }
  ],
  "json_queries": {
    "weather.temperature": {
      "queries": [
        "t"
      ]
    },
    "weather.humidity": {
      "queries": [
        "h"
      ]
    },
    "status.battery": {
      "queries": [
        "s.b"
      ]
    }
  }
}

In the example payload below, keys are kept short to minimize packet size.

Example Payload

{
  "id": "your_platform_id",
  "ts": "2020-11-9T23:36:50",
  "t": 22.3,
  "h": 88,
  "s": {
    "b": "15.4"
  }
}

Command line tests (Bash)

export SENAPS=senaps.eratos.com && export DS_USERNAME=<TODO> && export DS_PASSWORD=<TODO> && export DS_FRIENDLY_ID=<TODO>

t=$(date -u +%Y-%m-%dT%T);mosquitto_pub -h $SENAPS -p 1883 -u "dsapi-${DS_FRIENDLY_ID}:${DS_USERNAME}" -P "${DS_PASSWORD}" -t topic_name_not_relevant -m '{ "id": "your_platform_id", "ts": "'"${t}"'", "t": 22.3, "h": 88, "s": { "b": '"$((1 + $RANDOM % 10))"' }}'

Advanced Parser Configuration Guide

For this configuration example:

  • Some values in the example configuration are default values that could have been omitted but were included to make it obvious what parameters are available.
  • There are two JSONata queries for each query because it is configured so the parser can receive messages from different LoRa Servers that may use different JSON schemas.
  • Metadata is created for each stream to improve searchability and usability of data.
  • lorawan_channel_plan_queries: we use bandwidth and spreading factor data that is embedded in the JSON to derive the LoRaWAN data rate

{
    "platform_id_infix": "jsonPayloadTest",
    "platform_id_suffix_queries": [
        "end_device_ids.device_id",
        "nodeName"
    ],
    "lorawan_channel_plan_queries": {
        "channel_plan": "AS923",
        "bandwidth_queries": [
            "uplink_message.settings.data_rate.lora.bandwidth/1000",
            "txInfo.dataRate.bandwidth"
        ],
        "spreading_factor_queries": [
            "uplink_message.settings.data_rate.lora.spreading_factor",
            "txInfo.dataRate.spreadFactor"
        ],
        "stream_metadata": {
            "observedProperty": "http://registry.it.csiro.au/def/qudt/1.1/qudt-quantity/DataRate",
            "unitOfMeasure": "http://registry.it.csiro.au/def/qudt/1.1/qudt-unit/Unitless",
            "interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
            "type": ".ScalarStreamMetaData"
        }
    },
    "timestamp_queries": [
        {
            "query": "received_at",
            "time_zone": "UTC",
            "time_format": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSZ"
        },
        {
            "query": "rxInfo.time",
            "time_zone": "UTC",
            "time_format": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSZ"
        }
    ],
    "json_queries": {
        "LORAWAN.rssi": {
            "queries": [
                "uplink_message.rx_metadata[0].rssi",
                "rxInfo[0].rssi"
            ],
            "sample_period": "PT2H",
            "reporting_period": "PT4H",
            "stream_metadata": {
                "observedProperty": "http://registry.it.csiro.au/def/qudt/1.1/qudt-quantity/SignalStrength",
                "unitOfMeasure": "http://registry.it.csiro.au/def/qudt/1.1/qudt-unit/DecibelReferredToOneMilliwatt",
                "interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
                "type": ".ScalarStreamMetaData"
            }
        },
        "LORAWAN.snr": {
            "queries": [
                "uplink_message.rx_metadata[0].snr",
                "rxInfo[0].loRaSNR"
            ],
            "sample_period": "PT2H",
            "reporting_period": "PT4H",
            "stream_metadata": {
                "observedProperty": "http://registry.it.csiro.au/def/qudt/1.1/qudt-quantity/SignalStrength",
                "unitOfMeasure": "http://registry.it.csiro.au/def/qudt/1.1/qudt-unit/Unitless",
                "interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
                "type": ".ScalarStreamMetaData"
            }
        },
        "internal.temperature": {
            "queries": [
                "uplink_message.decoded_payload.t"
            ],
            "sample_period": "PT2H",
            "reporting_period": "PT4H",
            "stream_metadata": {
                "observedProperty": "http://registry.it.csiro.au/def/environment/property/air_temperature",
                "unitOfMeasure": "http://registry.it.csiro.au/def/qudt/1.1/qudt-unit/DegreeCelsius",
                "interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
                "type": ".ScalarStreamMetaData"
            }
        },
        "internal.humidity": {
            "queries": [
                "uplink_message.decoded_payload.h"
            ],
            "sample_period": "PT2H",
            "reporting_period": "PT4H",
            "stream_metadata": {
                "observedProperty": "http://registry.it.csiro.au/def/senaps/property/RelativeHumidity",
                "unitOfMeasure": "http://registry.it.csiro.au/def/qudt/1.1/qudt-unit/Percent",
                "interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
                "type": ".ScalarStreamMetaData"
            }
        },
        "status.battery": {
            "queries": [
                "uplink_message.decoded_payload.b"
            ],
            "sample_period": "PT2H",
            "reporting_period": "PT4H",
            "stream_metadata": {
                "observedProperty": "http://registry.it.csiro.au/def/senaps/property/BatteryVoltage",
                "unitOfMeasure": "http://registry.it.csiro.au/def/qudt/1.1/qudt-unit/Volt",
                "interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
                "type": ".ScalarStreamMetaData"
            }
        }
    },
    "csv_payload_queries": [
        "uplink_message.decoded_payload",
        "data"
    ],
    "csv_message_types": {
        "csv_th": {
            "time_zone": "Australia/Hobart",
            "time_format": "yyyy-MM-dd'T'HH:mm:ss",
            "sample_period": "PT1H",
            "reporting_period": "PT6H",
            "column_delimiter": ",",
            "line_delimiter": "\n",
            "headers": [
                {
                    "label": "temperature",
                    "stream_metadata": {
                        "observedProperty": "http://registry.it.csiro.au/def/environment/property/air_temperature",
                        "unitOfMeasure": "http://registry.it.csiro.au/def/qudt/1.1/qudt-unit/DegreeCelsius",
                        "interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
                        "type": ".ScalarStreamMetaData"
                    }
                },
                {
                    "label": "humidity",
                    "stream_metadata": {
                        "observedProperty": "http://registry.it.csiro.au/def/senaps/property/RelativeHumidity",
                        "unitOfMeasure": "http://registry.it.csiro.au/def/qudt/1.1/qudt-unit/Percent",
                        "interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
                        "type": ".ScalarStreamMetaData"
                    }
                }
            ]
        },
        "csv_croc": {
            "time_zone": "UTC",
            "time_format": "yyyy-MM-dd'T'HH:mm:ss",
            "sample_period": "P1D",
            "reporting_period": "P6D",
            "column_delimiter": ",",
            "line_delimiter": "\n",
            "headers": [
                {
                    "label": "crocodiles",
                    "stream_metadata": {
                        "observedProperty": "http://registry.it.csiro.au/def/environment/property/crocodile_concentration",
                        "unitOfMeasure": "http://registry.it.csiro.au/def/qudt/1.1/qudt-unit/KilogramPerMeter",
                        "interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
                        "type": ".ScalarStreamMetaData"
                    }
                }
            ]
        },
        "t": {
            "headers": []
        },
        "h": {
            "headers": []
        },
        "b": {
            "headers": []
        }
    }
}

The example payload below is a typical response from TTI (https://www.thethingsindustries.com/) when a payload formatter is configured to decode the
payload.
"...." signifies where lines or text has been omitted for brevity

The below example demonstrates sending a valid packet using mosquitto_pub from a bash command line.

export SENAPS=senaps.eratos.com && export DS_USERNAME=<TODO> && export DS_PASSWORD=<TODO> && export
DATA_SOURCE_FRIENDLY_ID=<TODO>
  
# Save the above example payload as test.json
t=$(date -u +%Y-%m-%dT%T);mosquitto_pub -h $SENAPS -p 1883 -u "dsapi-${DATA_SOURCE_FRIENDLY_ID}:${DS_USERNAME}" -P
"${DS_PASSWORD}" -t topic_name_not_relevant -s < ./test.json