JSON Message Parser
JSON Message Parser
Overview
This datasource parser allows you to subscribe to an MQTT topic and ingest data in a JSON format, which may include a JSON value containing CSV as a string or an array of strings. The parser is designed for ingesting data from a LoRaWAN server but can be configured for other cases.
Platform and Stream Naming Configuration Capabilities
- All platform and stream ids are prefixed with
dsapi-<ds_friendly_id>-
- The platform name is the same as the platform id but without the prefix
dsapi-<ds_friendly_id>-
- Platform and stream ids are configurable. The infix is constructed using static content.
- Stream ids are prefixed with the platform id, given an infix defined by the
csv_message_types
, and given the configured header label as a suffix. - Non-CSV stream infixes can be configured via the JSONata query (not recommended to unnecessarily append text in the query).
- The topic name is not used in this parser.
JSONata Queries
JSONata is a lightweight query and transformation language for JSON data that allows custom queries for your packet schema. Learn more
- Senaps parsers may use lists of queries to maintain compatibility with different schema versions.
- The first JSONata query in a list to get a non-null result is used.
- JSONata allows lookup of JSON nodes and simple manipulation such as substring or mathematical scaling.
- Test queries on messages at JSONata Exerciser, keeping in mind the limitations of JSONata4Java.
Parser Configuration
platform_id_infix (Optional)
- Used as part of the platform, stream, and group ids to help you search for them
- Default is "json"
- Naming example for a stream:
dsapi-<ds_friendly_id>-<platform_id_infix>.<device_id>.phenomenon
platform_id_suffix_queries
- A list of JSONata queries to find a device_id in the naming example above.
- The first matching query will be used.
lorawan_channel_plan_queries (Optional)
Useful for measuring performance of adaptive data rate algorithms.
- label (Optional)
- The data rate stream id suffix
- Default is "LORAWAN.data_rate"
- channel_plan
- Valid values may be AS923
- Other channel plans may be available in the future
- bandwidth_queries
- A list of JSONata queries to find the value of the bandwidth
- spreading_factor_queries
- A list of JSONata queries to find the value of the spreading factor
- sample_period (Optional)
- Typical time duration between samples, formatted as ISO 8601
- Default is null
- reporting_period (Optional)
- Typical time duration between messages uploading a batch of samples, formatted as ISO 8601
- Default is null
- stream_metadata (Optional)
- The data rate stream metadata
- Default is null
timestamp_queries (Optional with warning)
A list of timestamp queries used to extract and decode a timestamp that will accompany data extracted from JSON values (not CSV data).
- Only necessary if you are persisting data from JSON values.
- The first successful timestamp extraction will be used.
- If no timestamp can be extracted, the time of arrival at Senaps is used with a warning message logged.
query
- WARNING: Leaving blank will result in using time of arrival at Senaps. This is not recommended because if old messages are replayed such as after a system outage where queued messages are retried, the timestamp will be incorrect.
- The query is in the format of a JSONata query.
time_format (Optional)
- Default is ISO 8601 formats supported by dateTimeParser method
- A DateTimeFormat pattern as defined in Joda-Time
time_zone (Optional)
- Default is UTC
- Canonical ID for a timezone as listed in Joda-Time
- e.g. "Australia/Hobart", "UTC", "+1000"
csv_payload_queries (Optional)
- A list of JSONata queries to find CSV data.
- If not defined, no CSV data will be extracted.
json_queries (Optional)
A map where the key will become the stream suffix/label if a stream is created. The value contains JSONata queries and stream metadata for when a stream is created.
- queries
- A list of JSONata queries to find the integer or string value
- sample_period (Optional)
- Typical time duration between samples, formatted as ISO 8601
- Default is null
- reporting_period (Optional)
- Typical time duration between messages uploading a batch of samples, formatted as ISO 8601
- Default is null
- stream_metadata (Optional)
- The data rate stream metadata
- Default is null
csv_message_types (Optional)
Refer to CSV#csv_message_types
Basic CSV Configuration Example
For this configuration example:
platform_id_infix
Each platform and stream will be prefixed with dsapi-<ds_friendly_id>-your_org_and_project_namespace
.
platform_id_suffix_queries
The first element that successfully returns a result of the JSONata query is appended to the platform prefix derived in the previous step, i.e., dsapi-<ds_friendly_id>-your_org_and_project_namespace.your_platform_id
csv_payload_queries
The first element that successfully returns a result of the JSONata query defines where to find the CSV payload that needs to be decoded.
csv_message_types
Defines how to decode the CSV payload CSV#csv_message_types
Example Configuration
{
"platform_id_infix": "your_org_and_project_namespace",
"platform_id_suffix_queries": [
"id"
],
"csv_payload_queries": [
"csv"
],
"csv_message_types": {
"sensor_x": {
"headers": [
{
"label": "BattV_volts"
},
{
"label": "PTemp_C"
}
]
}
}
}
Example Payloads
Example payload 1 and 2 below are equivalent. Processing each message where the CSV data is in a single string is preferred and more optimal; however, the array option is provided as a more readable alternative.
Example payload 1
{
"id": "your_platform_id",
"csv": {
"sensor_x": "2021-11-09T23:36:50,3519,8\n2021-11-10T00:03:01,3515,0\n2021-11-10T00:18:02,3509,0\n2021-11-10T22:30:00,3980,10.41\n2021-11-10T22:45:00,4008,9.76\n2021-11-10T23:15:01,4067,8.63"
}
}
Example payload 2
{
"id": "your_platform_id",
"csv": {
"sensor_x": [
"2021-11-09T23:36:50,3519,8",
"2021-11-10T00:03:01,3515,0",
"2021-11-10T00:18:02,3509,0",
"2021-11-10T22:30:00,3980,10.41",
"2021-11-10T22:45:00,4008,9.76",
"2021-11-10T23:15:01,4067,8.63"
]
}
}
Command line tests (Bash)
export SENAPS=senaps.eratos.com && export DS_USERNAME=<TODO> && export DS_PASSWORD=<TODO> && export DS_FRIENDLY_ID=<TODO>
t=$(date -u +%Y-%m-%dT%T);mosquitto_pub -h $SENAPS -p 1883 -u "dsapi-${DS_FRIENDLY_ID}:${DS_USERNAME}" -P "${DS_PASSWORD}" -t topic_name_not_relevant -m '{ "id": "your_platform_id", "csv": { "sensor_x": "2021-11-09T23:36:50,3519,8\n2021-11-10T00:03:01,3515,0\n2021-11-10T00:18:02,3509,0\n2021-11-10T22:30:00,3980,10.41\n2021-11-10T22:45:00,4008,9.76\n'"${t}"',4067,'$((1 + $RANDOM % 10))'" }}'
t=$(date -u +%Y-%m-%dT%T);mosquitto_pub -h $SENAPS -p 1883 -u "dsapi-${DS_FRIENDLY_ID}:${DS_USERNAME}" -P "${DS_PASSWORD}" -t topic_name_not_relevant -m '{"id": "your_platform_id","csv": {"sensor_x": ["2021-11-09T23:36:50,3519,8","2021-11-10T00:03:01,3515,0","2021-11-10T00:18:02,3509,0","2021-11-10T22:30:00,3980,10.41","2021-11-10T22:45:00,4008,9.76","'$t',4067,'$((1 + $RANDOM % 10))'"]}}'
Initialisation Checks
If the parser has successfully passed configuration validation and connected to a data source, you will see the log message:
JSON Parser has started and is ready for incoming messages
Basic JSON Configuration Example
For this configuration example:
timestamp
The first element that successfully returns a result of the JSONata query will be used as the timestamp that will be saved with the timeseries values within the JSON.
Because we have not defined time_format
or time_zone
, any ISO 8601 formatted datetime will be parsed with the UTC timezone.
json_queries
weather.temperature
Optional query which will search for the key s1
and parse the value to create the temperature stream dsapi-<ds_friendly_id>-your_org_and_project_namespace.your_platform_id.weather.temperature
Note that the value of the search may be an int, float, or a string containing an int or float.
weather.humidity
Optional query which will search for the key s2
and parse the value to create the temperature stream dsapi-<ds_friendly_id>-your_org_and_project_namespace.your_platform_id.weather.humidity
status.battery
Optional query which will search for the key s1
and parse the value to create the temperature stream dsapi-<ds_friendly_id>-your_org_and_project_namespace.your_platform_id.status.battery
Example Configuration
{
"platform_id_infix": "your_org_and_project_namespace",
"platform_id_suffix_queries": [
"id"
],
"timestamp_queries": [
{
"query": "ts"
}
],
"json_queries": {
"weather.temperature": {
"queries": [
"t"
]
},
"weather.humidity": {
"queries": [
"h"
]
},
"status.battery": {
"queries": [
"s.b"
]
}
}
}
In the example payload below, keys are kept short to minimize packet size.
Example Payload
{
"id": "your_platform_id",
"ts": "2020-11-9T23:36:50",
"t": 22.3,
"h": 88,
"s": {
"b": "15.4"
}
}
Command line tests (Bash)
export SENAPS=senaps.eratos.com && export DS_USERNAME=<TODO> && export DS_PASSWORD=<TODO> && export DS_FRIENDLY_ID=<TODO>
t=$(date -u +%Y-%m-%dT%T);mosquitto_pub -h $SENAPS -p 1883 -u "dsapi-${DS_FRIENDLY_ID}:${DS_USERNAME}" -P "${DS_PASSWORD}" -t topic_name_not_relevant -m '{ "id": "your_platform_id", "ts": "'"${t}"'", "t": 22.3, "h": 88, "s": { "b": '"$((1 + $RANDOM % 10))"' }}'
Advanced Parser Configuration Guide
For this configuration example:
- Some values in the example configuration are default values that could have been omitted but were included to make it obvious what parameters are available.
- There are two JSONata queries for each query because it is configured so the parser can receive messages from different LoRa Servers that may use different JSON schemas.
- Metadata is created for each stream to improve searchability and usability of data.
lorawan_channel_plan_queries
: we use bandwidth and spreading factor data that is embedded in the JSON to derive the LoRaWAN data rate
{
"platform_id_infix": "jsonPayloadTest",
"platform_id_suffix_queries": [
"end_device_ids.device_id",
"nodeName"
],
"lorawan_channel_plan_queries": {
"channel_plan": "AS923",
"bandwidth_queries": [
"uplink_message.settings.data_rate.lora.bandwidth/1000",
"txInfo.dataRate.bandwidth"
],
"spreading_factor_queries": [
"uplink_message.settings.data_rate.lora.spreading_factor",
"txInfo.dataRate.spreadFactor"
],
"stream_metadata": {
"observedProperty": "http://registry.it.csiro.au/def/qudt/1.1/qudt-quantity/DataRate",
"unitOfMeasure": "http://registry.it.csiro.au/def/qudt/1.1/qudt-unit/Unitless",
"interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
"type": ".ScalarStreamMetaData"
}
},
"timestamp_queries": [
{
"query": "received_at",
"time_zone": "UTC",
"time_format": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSZ"
},
{
"query": "rxInfo.time",
"time_zone": "UTC",
"time_format": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSZ"
}
],
"json_queries": {
"LORAWAN.rssi": {
"queries": [
"uplink_message.rx_metadata[0].rssi",
"rxInfo[0].rssi"
],
"sample_period": "PT2H",
"reporting_period": "PT4H",
"stream_metadata": {
"observedProperty": "http://registry.it.csiro.au/def/qudt/1.1/qudt-quantity/SignalStrength",
"unitOfMeasure": "http://registry.it.csiro.au/def/qudt/1.1/qudt-unit/DecibelReferredToOneMilliwatt",
"interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
"type": ".ScalarStreamMetaData"
}
},
"LORAWAN.snr": {
"queries": [
"uplink_message.rx_metadata[0].snr",
"rxInfo[0].loRaSNR"
],
"sample_period": "PT2H",
"reporting_period": "PT4H",
"stream_metadata": {
"observedProperty": "http://registry.it.csiro.au/def/qudt/1.1/qudt-quantity/SignalStrength",
"unitOfMeasure": "http://registry.it.csiro.au/def/qudt/1.1/qudt-unit/Unitless",
"interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
"type": ".ScalarStreamMetaData"
}
},
"internal.temperature": {
"queries": [
"uplink_message.decoded_payload.t"
],
"sample_period": "PT2H",
"reporting_period": "PT4H",
"stream_metadata": {
"observedProperty": "http://registry.it.csiro.au/def/environment/property/air_temperature",
"unitOfMeasure": "http://registry.it.csiro.au/def/qudt/1.1/qudt-unit/DegreeCelsius",
"interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
"type": ".ScalarStreamMetaData"
}
},
"internal.humidity": {
"queries": [
"uplink_message.decoded_payload.h"
],
"sample_period": "PT2H",
"reporting_period": "PT4H",
"stream_metadata": {
"observedProperty": "http://registry.it.csiro.au/def/senaps/property/RelativeHumidity",
"unitOfMeasure": "http://registry.it.csiro.au/def/qudt/1.1/qudt-unit/Percent",
"interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
"type": ".ScalarStreamMetaData"
}
},
"status.battery": {
"queries": [
"uplink_message.decoded_payload.b"
],
"sample_period": "PT2H",
"reporting_period": "PT4H",
"stream_metadata": {
"observedProperty": "http://registry.it.csiro.au/def/senaps/property/BatteryVoltage",
"unitOfMeasure": "http://registry.it.csiro.au/def/qudt/1.1/qudt-unit/Volt",
"interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
"type": ".ScalarStreamMetaData"
}
}
},
"csv_payload_queries": [
"uplink_message.decoded_payload",
"data"
],
"csv_message_types": {
"csv_th": {
"time_zone": "Australia/Hobart",
"time_format": "yyyy-MM-dd'T'HH:mm:ss",
"sample_period": "PT1H",
"reporting_period": "PT6H",
"column_delimiter": ",",
"line_delimiter": "\n",
"headers": [
{
"label": "temperature",
"stream_metadata": {
"observedProperty": "http://registry.it.csiro.au/def/environment/property/air_temperature",
"unitOfMeasure": "http://registry.it.csiro.au/def/qudt/1.1/qudt-unit/DegreeCelsius",
"interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
"type": ".ScalarStreamMetaData"
}
},
{
"label": "humidity",
"stream_metadata": {
"observedProperty": "http://registry.it.csiro.au/def/senaps/property/RelativeHumidity",
"unitOfMeasure": "http://registry.it.csiro.au/def/qudt/1.1/qudt-unit/Percent",
"interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
"type": ".ScalarStreamMetaData"
}
}
]
},
"csv_croc": {
"time_zone": "UTC",
"time_format": "yyyy-MM-dd'T'HH:mm:ss",
"sample_period": "P1D",
"reporting_period": "P6D",
"column_delimiter": ",",
"line_delimiter": "\n",
"headers": [
{
"label": "crocodiles",
"stream_metadata": {
"observedProperty": "http://registry.it.csiro.au/def/environment/property/crocodile_concentration",
"unitOfMeasure": "http://registry.it.csiro.au/def/qudt/1.1/qudt-unit/KilogramPerMeter",
"interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
"type": ".ScalarStreamMetaData"
}
}
]
},
"t": {
"headers": []
},
"h": {
"headers": []
},
"b": {
"headers": []
}
}
}
The example payload below is a typical response from TTI (https://www.thethingsindustries.com/) when a payload formatter is configured to decode the
payload.
"...." signifies where lines or text has been omitted for brevity
The below example demonstrates sending a valid packet using mosquitto_pub
from a bash command line.
export SENAPS=senaps.eratos.com && export DS_USERNAME=<TODO> && export DS_PASSWORD=<TODO> && export
DATA_SOURCE_FRIENDLY_ID=<TODO>
# Save the above example payload as test.json
t=$(date -u +%Y-%m-%dT%T);mosquitto_pub -h $SENAPS -p 1883 -u "dsapi-${DATA_SOURCE_FRIENDLY_ID}:${DS_USERNAME}" -P
"${DS_PASSWORD}" -t topic_name_not_relevant -s < ./test.json
Updated 8 days ago