Water System Monitoring Deep Dive
Technical Deep Dive on an inexpensive cloud based monitoring solution for our neighborhood water system

Everything should be made as simple as possible, but not simpler
~Albert Einstein
This is a continuation of a previous post on a water monitoring solution for the cloud. In this post I will dive into the technical solution and the services leveraged including code samples. The sample templates I provide below can be stitched together using AWS Serverless Application Model (SAM) and GitHub Actions to deploy a CI/CD pipeline for the full stack. A good tutorial is located here. The full solution presented below is also possible to build manually through console or by importing the CloudFormation templates below for each of the key service components.
The first major design decision I had to make was what services and protocols to leverage as a cloud interface. My hand was forced as I had already decided to purchase the AWS gateway by National Control Devices (NCD) which was built with native AWS IoT support for the sensors and gateways. However, there are some inherent benefits to using IoT. One benefit is that it provides a readily available secure endpoint to the cloud. IoT (specifically MQTT protocol) leverages a publisher / consumer (pub / sub) model as opposed to poll / response. This means that the devices (and ultimately the gateway) are a single source of truth and publish data to a message broker. The message broker then allows for consumers to subscribe to topics. The communication between the producer and consumer should be encrypted. Ideally, the device would allow for certificate based authentication and encryption by simply creating or providing a certificate (with public and private keys). The NCD gateway leverages Identify Access Management (IAM) users with a secret access ID and key to allow programmatic access to publish to IoT. My preference would have been certificate based encryption, rather than an IAM user that needs to be managed, but ultimately I accepted the risk as this was a monitoring solution only and I would ensure that the secret access keys were safely stored. Regardless, there are some good practices to follow if you are going to go this route. AWS has a good reference document for best practices to follow for IAM. For my application, I wanted to ensure that the creation and management of the IAM user followed least privilege and only had an explicit role to provide the AWS gateway a connection to IoT and ability to publish messages to a topic that I allow. By providing a good policy document and keeping my secret access keys secret, I felt comfortable with the approach. In addition to allowing for secure access, IoT allows for a serverless connection that can leverage public internet, negating a requirement to manage servers or network connections. By just creating the IAM user with policies I was able to quickly start consuming data into my AWS account.
This CloudFormation template will create an IAM user, store the credentials in Secrets Manager, and only allow MQTT publish to the topic endpoint that we specified:
IoTPolicy:
Type: AWS::IAM::ManagedPolicy
Properties:
PolicyDocument:
Version: '2012-10-17'
Statement:
-
Sid: AllowPublishIoTPolicy
Effect: Allow
Action:
- 'iot:Connect'
- 'iot:Publish'
Resource: !Sub 'arn:aws:iot:${AWS::Region}:${AWS::AccountId}:topic/myWaterTopic'
IoTDevice:
Type: AWS::IAM::User
Properties:
Path: /iotdevice/
UserName: !Ref AWS::StackName
ManagedPolicyArns:
- !Ref IoTPolicy
IoTCredentials:
Type: AWS::IAM::AccessKey
Properties:
Status: Active
UserName: !Ref IoTDevice
IoTCredentialsStored:
Type: AWS::SecretsManager::Secret
Properties:
Name: !Sub /iotdevice/credentials/${IoTDevice}
SecretString: !Sub '{ACCESS_KEY":"${IoTCredentials}","SECRET_KEY":"${IoTCredentials.SecretAccessKey}"}'
Now that we are securely publishing data to IoT I needed to think about the use cases and how we would be consuming the data. Our original intent was monitoring and alarming if the water tank level reached a critical threshold or water flow into or out of the system were outside of a specified range. I also wanted the ability to trend level and flow and provide graphics that the neighborhood could use.
Using these requirements I settled on Sitewise as the main engine for my asset and data model, graphics and long term trending. For alarming purposes, Sitewise uses AWS IoT Events out of the box, but I ultimately decided to build my own alarming engine using Lambda, IoT and SNS. This was a result of Sitewise alarming not having support for hysteresis and lack of support for custom configuration of alarming beyond basic thresholds. For example, one use case was to detect stale data (which could be loss of signal or pegged out 4-20mA signal). To detect this I wanted to look at the past 60 minutes of data and if the values did not change then an alarm would be sent. One drawback of going the custom alarm engine route is that you can no longer leverage IoT Events capability to silence alarms if they have been sent. This means that I needed a mechanism to support if alarms had been sent and to silence for a set period of time before an alarm is sent again. In order to accomplish this I used the retained message support for AWS IoT. By publishing to an alarm sent topic all clients subscribing to that topic will know that an alarm was generated and the timestamp. Subsequent reads of an alarm condition can check this topic and compare timestamps. If the sensor is still in alarm and the time is greater than the allowable silence period then the alarm will be sent again.
For this exercise I created an empty Sitewise Asset Model with the correct Attributes, Measurements and Transforms with Aliases that are used by Lambda. The transforms below take the raw signals (4-20ma) and calculate actual GPM from the meters. Similarly, the transforms calculate both tank level in feet and percentage from the raw millimeter values. Once the Asset Model is created, it is fairly straightforward to create the portal via CLI or through console and apply permissions.
#Create a Basic Water System Object Model
WaterMonitoringModel:
Type: AWS::IoTSiteWise::AssetModel
Properties:
AssetModelDescription: Model for a Water Tank serving a neighborhood well
AssetModelName: Water Monitoring Model
AssetModelProperties:
- DataType: DOUBLE
LogicalId: SitewiseModelLowLevelAlarm
Name: Low_Level_Alarm
Type:
TypeName: Attribute
Attribute:
DefaultValue: '1050'
- DataType: DOUBLE
LogicalId: SitewiseModelFlowMeterOORMA
Name: Flow_Meter_OOR_MA
Type:
TypeName: Attribute
Attribute:
DefaultValue: '3.45'
- DataType: DOUBLE
LogicalId: SitewiseModelBadLevelAlarm
Name: Bad_Level_Alarm
Type:
TypeName: Attribute
Attribute:
DefaultValue: '9999'
- DataType: INTEGER
LogicalId: SitewiseModelTankLevel
Name: TankLevelValue
Unit: 'mm'
Type:
TypeName: Measurement
- DataType: INTEGER
LogicalId: SitewiseModelAmbTemp
Name: AmbientTemperature
Unit: 'F'
Type:
TypeName: Measurement
- DataType: DOUBLE
LogicalId: SitewiseModelinFlowMA
Name: inFlowMilliAmps
Unit: 'mA'
Type:
TypeName: Measurement
- DataType: DOUBLE
LogicalId: SitewiseModeloutFlowMA
Name: outFlowMilliAmps
Unit: 'mA'
Type:
TypeName: Measurement
- DataType: DOUBLE
LogicalId: TankLevelPercentTransform
Name: tankLevelPercent
Unit: '%'
Type:
TypeName: Transform
Transform:
Expression: ' 100 / 3657.6 * ( 3657.6 - tanklevelvalue )'
Variables:
- Name: tanklevelvalue
Value:
PropertyLogicalId: SitewiseModelTankLevel
- DataType: DOUBLE
LogicalId: TankLevelFeetTransform
Name: tankLevelFeet
Unit: 'ft'
Type:
TypeName: Transform
Transform:
Expression: ' 1 / 305 * ( tanklevelvalue )'
Variables:
- Name: tanklevelvalue
Value:
PropertyLogicalId: SitewiseModelTankLevel
- DataType: DOUBLE
LogicalId: WellFlowGPMTransform
Name: wellFlowGPM
Unit: 'GPM'
Type:
TypeName: Transform
Transform:
Expression: ' if(gt(wellflowma,5),((wellflowma - 4) / (( 20 - 4 ) / ( 20 - 0))),0)'
Variables:
- Name: wellflowma
Value:
PropertyLogicalId: SitewiseModelinFlowMA
PropertyLogicalId: SitewiseModelTankLevel
- DataType: DOUBLE
LogicalId: OutFlowGPMTransform
Name: outFlowGPM
Unit: 'GPM'
Type:
TypeName: Transform
Transform:
Expression: ' if(gt(outflowma,5),((outflowma - 4) / (( 20 - 4 ) / ( 20 - 0))),0)'
Variables:
- Name: outflowma
Value:
PropertyLogicalId: SitewiseModeloutFlowMA
SitewiseAssetInstance:
Type: AWS::IoTSiteWise::Asset
Properties:
AssetDescription: Instance of my water monitoring Asset
AssetModelId: !Ref WaterMonitoringModel
AssetName: Water Monitoring
AssetProperties:
- Alias: 'tank_level_mm'
LogicalId: SitewiseModelTankLevel
- Alias: 'tank_in_flow_ma'
LogicalId: SitewiseModelinFlowMA
- Alias: 'tank_out_flow_ma'
LogicalId: SitewiseModeloutFlowMA
- Alias: 'ambient_temperature'
LogicalId: SitewiseModelAmbTemp
With the sitewise model created I still needed a mechanism for piping data from IoT into Sitewise. I was able to accomplish this by creating a rule in IoT that would listen on a topic and then invoke a Lambda to publish data to Sitewise using the measurement aliases we created above. The Lambda will need to leverage the data to determine what signal has published a new message. It accomplishes this by looking at the payload and looking for the correct measurement values. A total of 3 lambda functions are leveraged for the full solution. A description of what they do are below:
Lambda | Description | Triggers |
---|---|---|
Sitewise Publisher | ETL for sensor payload | Sensor Publish on topic |
Sitewise Alarm Engine | Alarm condition monitor | EventBridge (Every 10 minutes) |
Current ambient temperature | Google temperature scraper for local conditions | EventBridge (Every 10 minutes) |
Here is sample code for the Sitewise Publisher. This lambda will be configured to run from IoT event rule. Configure the Alias ID’s as environment inputs and let lambda assume a IoTSitewise role.
import json
import math
import boto3
import os
#sitewise alias id's are defined as environmental variables in lambda
def lambda_handler(event, context):
mytime = time.time()
mysec = math.floor(mytime)
entryID = 'Entry_' + str(mysec)
propValList=[]
propValDict={}
if 'mA_1' in event['data']: #flow #1
doubVal=event['data']['mA_1']
alliasID=os.environ['flow_in_alias_id']
propValDict['value']={'doubleValue': doubVal}
propValDict['timestamp']={'timeInSeconds': mysec}
propValList.append(propValDict)
write_sitewise_values(entryID,alliasID,propValList)
propValList=[]
propValDict={}
if 'mA_2' in event['data']: #flow #2
doubVal=event['data']['mA_2']
alliasID=os.environ['flow_out_alias_id']
propValDict['value']={'doubleValue': doubVal}
propValDict['timestamp']={'timeInSeconds': mysec}
propValList.append(propValDict)
write_sitewise_values(entryID,alliasID,propValList)
propValList=[]
propValDict={}
elif 'level' in event['data']: #level
intVal = event['data']['level']
alliasID = os.environ['level_alias_id']
propValDict['value']={'integerValue': intVal}
propValDict['timestamp']={'timeInSeconds': mysec}
propValList.append(propValDict)
if intVal != 9999: #Do not write bad value to sitewise - let alarming engine deal
write_sitewise_values(entryID,alliasID,propValList
return {
'statusCode': 200,
'body': json.dumps('Publish to Sitwise Successful!')
}
def write_sitewise_values(entryID,alliasID,propValList):
client=boto3.client('iotsitewise')
response = client.batch_put_asset_property_value(
entries=[
{
'entryId': entryID ,
'propertyAlias': alliasID,
'propertyValues': propValList
},
]
)
Once data is piped into Sitewise the service handles everything else. Just create a portal and populate meaningful trends. The alarming engine is the most critical function in this solution. As mentioned earlier in the post it is leveraging Retained Message support and using SNS to send out alarms. The basic code looks like this:
import os
import json
import boto3
import botocore
import time
import math
def lambda_handler(event, context):
MAX_ALARM_TIME = 8 #Hours
MAX_STALE_TIME =1 #
userEmail = os.environ['User_Email']
asset_Id = os.environ['asset_ID']
#AssetAlarmProperties
tankLevelAlarmID = os.environ['tank_level_alarm_id']
flowInmaAlarmID = os.environ['flow_in_alarm_id']
flowOutmaAlarmID = os.environ['flow_out_alarm_id']
sitewisePortal = os.environ['sitewise_portal']
snsArn = so.environ['sns_arn']
#regex strings for alarming
regex_level_string = 'tank_level'
regex_flow_string = 'flow_ma'
#use ioT retained messages for alarming, uses known sensor failure modes
errorTopicList=['tank_level_mm', 'tank_in_flow_ma','tank_out_flow_ma', 'tank_level_mm_stale', 'tank_in_flow_ma_stale','tank_out_flow_stale']
defaultPayload = {"Email_Sent": False,"Timestamp": int(round(time.time()))}
#sitewise alarm thresholds
propertyAliasList=[['tank_level_mm','integerValue','Low_Level_Alarm',tankLevelAlarmID],
['tank_in_flow_ma','doubleValue','Flow_Meter_OOR_MilliAmp',flowInmaAlarmID],
['tank_out_flow_ma','doubleValue','Flow_Meter_OOR_MilliAmp',flowOutmaAlarmID]]
runtime_region = os.environ['AWS_REGION']
iot_client = boto3.client('iot-data',region_name=runtime_region)
sw_client=boto3.client('iotsitewise', region_name=runtime_region)
#ensure retained messages exist on ioT Core/if not then create
for item in errorTopicList:
try:
response = iot_client.get_retained_message(topic=f'alarms/{item}')
except botocore.exceptions.ClientError as error: #repost default payload for alarm topic
if error.response['Error']['Code'] == 'ResourceNotFoundException':
iot_write_retained_msg(runtime_region,f'alarms/{item}',defaultPayload)
else:
pass
#First check threshold based alarming - Loop through sensors to check
for propVal in propertyAliasList:
Valresponse = sw_client.get_asset_property_value(
propertyAlias = str(propVal[0]).strip()
)
measured_Value=Valresponse['propertyValue']['value'][propVal[1]]
timeStamp=Valresponse['propertyValue']['timestamp']['timeInSeconds']
response = sw_client.get_asset_property_value(
assetId=asset_Id,
propertyId=propVal[3]
)
threshold = response['propertyValue']['value']['doubleValue']
response = iot_client.get_retained_message(topic=f'alarms/{propVal[0]}')
val=response['payload'].decode('utf-8')
iotRmEmailState=json.loads(val)
StaleValue = abs(timeStamp-int(iotRmEmailState['Timestamp']))
#Tank level threshold check
#Use list comprehension with regex string
if (([v for v in propVal if regex_level_string in v]) and measured_Value > threshold):
cw_Payload={'Sensor': str(propVal[0]),'Timestamp': timeStamp,'StaleValue': StaleValue,'Measured_Value':measured_Value, 'Threshold':threshold, 'Email_Sent': iotRmEmailState['Email_Sent']}
if (iotRmEmailState['Email_Sent']==False or StaleValue > (MAX_ALARM_TIME*60*60)):
messageStart = f'Water level of {measured_Value} mm exceeds threshold {threshold} mm. This is 35% of normal level. View Sitewise page for realtime level and flow (username: {userEmail}) \n'
subjectEmail = 'Water level is low - Please conserve'
send_alarm_sns(str(propVal[0]),measured_Value,threshold,messageStart,subjectEmail,sitewisePortal,snsArn)
#update retained message for email sent
iot_Payload={"Email_Sent": True, "Timestamp": timeStamp}
iot_write_retained_msg(runtime_region,f'alarms/{propVal[0]}',iot_Payload)
print(f'Alarm is active, Email has been sent: {cw_Payload}') #write to cloudwatch, alarm is silcenced but exceeds threshold
else: #Tank level in alarm, currently alarm is silenced
print(f'Alarm is active, Email is silcenced: {cw_Payload}') #write to cloudwatch, alarm is silcenced but exceeds threshold
#Check if 4-20mA sensors fail range
#Currently checking low current range
#Use list comprehension with regex string
if (([v for v in propVal if regex_flow_string in v]) and measured_Value < threshold):
cw_Payload={'Sensor': str(propVal[0]),'Timestamp': timeStamp,'StaleValue': StaleValue,'Measured_Value':measured_Value, 'Threshold':threshold,'Email_Sent': iotRmEmailState['Email_Sent']}
if (iotRmEmailState['Email_Sent']==False or StaleValue > (MAX_ALARM_TIME*60*60)):
messageStart = f'Flow sensor {propVal[0]} current of {measured_Value} mA is below threshold {threshold} mA. View Sitewise page for realtime level and flow (username: {userEmail}) \n'
subjectEmail = f'Water flow sensor {propval[0]} out of range - Check immeditately'
send_alarm_sns(str(propVal[0]),measured_Value,threshold,messageStart,subjectEmail,sitewisePortal,snsArn)
#update retained message for email sent
iot_Payload={"Email_Sent": True, "Timestamp": timeStamp}
iot_write_retained_msg(runtime_region,f'alarms/{propVal[0]}',iot_Payload)
print(f'Alarm is active, Email has been sent: {cw_Payload}') #write to cloudwatch, alarm is silcenced but is out of range
else: #flow sensor is in alarm, currently alarm is silenced
print(f'Alarm is active, Email is silcenced: {cw_Payload}') #write to cloudwatch, alarm is silcenced but exceeds threshold
#now check stale values for last 60 minutes
myCurtime = time.time()
myCursec = math.floor(myCurtime)
myLastsec = myCursec - (MAX_STALE_TIME*60*60)
for propVal in propertyAliasList:
wellResponse = sw_client.get_asset_property_value_history(
propertyAlias= str(propVal[0]).strip(),
startDate=myLastsec,
endDate=myCursec
)
histvalList=[]
for val in wellResponse['assetPropertyValueHistory']:
if propVal[0] =='tank_level_mm':
histvalList.append(val['value']['integerValue'])
else:
histvalList.append(val['value']['doubleValue'])
isStaleData, isStaleDataMsg=check_stale_values(histvalList,(MAX_STALE_TIME*60*60))
if isStaleData == False:
cw_Payload={'Sensor': str(propVal[0]),'Timestamp': myCurtime,'StaleTime': myLastsec,'HistValList': histvalList}
return {
'statusCode': 200,
'body': json.dumps('Neighborhood Water Monitor Completed!')
}
#helper function to check if values are stale or missing
def check_stale_values(histvalList,timeRange):
if len(histvalList) < 0:
return True, f'No historical values exist for range {timeRange} seconds - no sitewise data'
elif all(ele == histvalList[0] for ele in histvalList):
return True, f'Sitewise is using stale data for range {timeRange} seconds'
else:
return False, 'Data is not stale'
#helper function updates retained message
def iot_write_retained_msg(runtime_region,topic_item,payloadVal):
iot_client = boto3.client('iot-data',region_name=runtime_region)
response = iot_client.publish(
topic=topic_item,
qos=1,
retain=True,
payload=json.dumps(payloadVal)
)
#Send SNS
def send_alarm_sns(propVal,measured_Value, threshold,messageStart,subjectEmail,sitewisePortal,snsArn):
message =messageStart
message+=f"Property Value: {propVal} \n"
message+=f"Measured Value of {measured_Value} is above the threshold of {threshold} milimeters \n"
message+="\n"
message+=sitewisePortal
client = boto3.client('sns')
response = client.publish(
TargetArn=snsArn,
Message=message,
MessageStructure='text',
Subject=subjectEmail,
)
return {
'statusCode': 200,
'body': json.dumps('Email Sent')
}
The last fun part of the solution is providing a localized temperature without an actual sensor value. For this, I just scrape google for the localized temperature for my city and then publish to Sitewise. This allows us to track water usage as a function of temperature over time.
import json
import bs4
import requests
import boto3
import time
import math
def lambda_handler(event, context):
url='https://www.google.com/search?q=san+francisco+current+temperature'
html=requests.get(url).content
soup=bs4.BeautifulSoup(html,'html.parser')
full_temp=soup.find('div', attrs={'class': 'BNeawe iBp4i AP7Wnd'}).text
makeTempStr=''
mytime = time.time()
mysec = math.floor(mytime)
entryID = 'Entry_' + str(mysec)
propValList=[]
propValDict={}
client=boto3.client('iotsitewise')
for val in full_temp:
if not ord(val)==176:
makeTempStr+=val
else:
break
try:
makeTempInt=int(makeTempStr)
except:
print('error')
finally:
intValue=makeTempInt
alliasID='ambient_temperature'
propValDict['value']={'integerValue': intValue}
propValDict['timestamp']={'timeInSeconds': mysec}
propValList.append(propValDict)
write_sitewise_values(entryID,alliasID,propValList)
propValList=[]
propValDict={}
return {
'statusCode': 200,
'body': json.dumps('Temperaure Retrieved!')
}
def write_sitewise_values(entryID,alliasID,propValList):
client=boto3.client('iotsitewise')
response = client.batch_put_asset_property_value(
entries=[
{
'entryId': entryID ,
'propertyAlias': alliasID,
'propertyValues': propValList
},
]
)
The only remaining tasks to do is to build your SNS notification topic and provide the correct permissions for the alarming lambda to send. Would love to see any comments on the solution above and potential improvements.
comments powered by Disqus