February 20, 2023

Water System Monitoring Deep Dive

Technical Deep Dive on an inexpensive cloud based monitoring solution for our neighborhood water system

Everything should be made as simple as possible, but not simpler
~Albert Einstein

This is a continuation of a previous post on a water monitoring solution for the cloud. In this post I will dive into the technical solution and the services leveraged including code samples. The sample templates I provide below can be stitched together using AWS Serverless Application Model (SAM) and GitHub Actions to deploy a CI/CD pipeline for the full stack. A good tutorial is located here. The full solution presented below is also possible to build manually through console or by importing the CloudFormation templates below for each of the key service components.

The first major design decision I had to make was what services and protocols to leverage as a cloud interface. My hand was forced as I had already decided to purchase the AWS gateway by National Control Devices (NCD) which was built with native AWS IoT support for the sensors and gateways. However, there are some inherent benefits to using IoT. One benefit is that it provides a readily available secure endpoint to the cloud. IoT (specifically MQTT protocol) leverages a publisher / consumer (pub / sub) model as opposed to poll / response. This means that the devices (and ultimately the gateway) are a single source of truth and publish data to a message broker. The message broker then allows for consumers to subscribe to topics. The communication between the producer and consumer should be encrypted. Ideally, the device would allow for certificate based authentication and encryption by simply creating or providing a certificate (with public and private keys). The NCD gateway leverages Identify Access Management (IAM) users with a secret access ID and key to allow programmatic access to publish to IoT. My preference would have been certificate based encryption, rather than an IAM user that needs to be managed, but ultimately I accepted the risk as this was a monitoring solution only and I would ensure that the secret access keys were safely stored. Regardless, there are some good practices to follow if you are going to go this route. AWS has a good reference document for best practices to follow for IAM. For my application, I wanted to ensure that the creation and management of the IAM user followed least privilege and only had an explicit role to provide the AWS gateway a connection to IoT and ability to publish messages to a topic that I allow. By providing a good policy document and keeping my secret access keys secret, I felt comfortable with the approach. In addition to allowing for secure access, IoT allows for a serverless connection that can leverage public internet, negating a requirement to manage servers or network connections. By just creating the IAM user with policies I was able to quickly start consuming data into my AWS account.

This CloudFormation template will create an IAM user, store the credentials in Secrets Manager, and only allow MQTT publish to the topic endpoint that we specified:

  IoTPolicy:
    Type: AWS::IAM::ManagedPolicy
    Properties:
      PolicyDocument:
        Version: '2012-10-17'
        Statement:
          -
            Sid: AllowPublishIoTPolicy
            Effect: Allow
            Action:
              - 'iot:Connect'
              - 'iot:Publish'
            Resource: !Sub 'arn:aws:iot:${AWS::Region}:${AWS::AccountId}:topic/myWaterTopic'
  IoTDevice:
    Type: AWS::IAM::User
    Properties:
      Path: /iotdevice/
      UserName: !Ref AWS::StackName
      ManagedPolicyArns:
        - !Ref IoTPolicy
  IoTCredentials:
    Type: AWS::IAM::AccessKey
    Properties:
      Status: Active
      UserName: !Ref IoTDevice
  IoTCredentialsStored:
    Type: AWS::SecretsManager::Secret
    Properties:
      Name: !Sub /iotdevice/credentials/${IoTDevice}
      SecretString: !Sub '{ACCESS_KEY":"${IoTCredentials}","SECRET_KEY":"${IoTCredentials.SecretAccessKey}"}'  

Now that we are securely publishing data to IoT I needed to think about the use cases and how we would be consuming the data. Our original intent was monitoring and alarming if the water tank level reached a critical threshold or water flow into or out of the system were outside of a specified range. I also wanted the ability to trend level and flow and provide graphics that the neighborhood could use.

Using these requirements I settled on Sitewise as the main engine for my asset and data model, graphics and long term trending. For alarming purposes, Sitewise uses AWS IoT Events out of the box, but I ultimately decided to build my own alarming engine using Lambda, IoT and SNS. This was a result of Sitewise alarming not having support for hysteresis and lack of support for custom configuration of alarming beyond basic thresholds. For example, one use case was to detect stale data (which could be loss of signal or pegged out 4-20mA signal). To detect this I wanted to look at the past 60 minutes of data and if the values did not change then an alarm would be sent. One drawback of going the custom alarm engine route is that you can no longer leverage IoT Events capability to silence alarms if they have been sent. This means that I needed a mechanism to support if alarms had been sent and to silence for a set period of time before an alarm is sent again. In order to accomplish this I used the retained message support for AWS IoT. By publishing to an alarm sent topic all clients subscribing to that topic will know that an alarm was generated and the timestamp. Subsequent reads of an alarm condition can check this topic and compare timestamps. If the sensor is still in alarm and the time is greater than the allowable silence period then the alarm will be sent again.

For this exercise I created an empty Sitewise Asset Model with the correct Attributes, Measurements and Transforms with Aliases that are used by Lambda. The transforms below take the raw signals (4-20ma) and calculate actual GPM from the meters. Similarly, the transforms calculate both tank level in feet and percentage from the raw millimeter values. Once the Asset Model is created, it is fairly straightforward to create the portal via CLI or through console and apply permissions.

#Create a Basic Water System Object Model
  WaterMonitoringModel:
    Type: AWS::IoTSiteWise::AssetModel
    Properties:
      AssetModelDescription: Model for a Water Tank serving a neighborhood well
      AssetModelName: Water Monitoring Model
      AssetModelProperties:
        - DataType: DOUBLE
          LogicalId: SitewiseModelLowLevelAlarm
          Name: Low_Level_Alarm
          Type:
            TypeName: Attribute
            Attribute:
              DefaultValue: '1050'
        - DataType: DOUBLE
          LogicalId: SitewiseModelFlowMeterOORMA
          Name: Flow_Meter_OOR_MA
          Type:
            TypeName: Attribute
            Attribute:
              DefaultValue: '3.45'
        - DataType: DOUBLE
          LogicalId: SitewiseModelBadLevelAlarm
          Name: Bad_Level_Alarm
          Type:
            TypeName: Attribute
            Attribute:
              DefaultValue: '9999'
        - DataType: INTEGER
          LogicalId: SitewiseModelTankLevel
          Name: TankLevelValue
          Unit: 'mm'
          Type:
            TypeName: Measurement
        - DataType: INTEGER
          LogicalId: SitewiseModelAmbTemp
          Name: AmbientTemperature
          Unit: 'F'
          Type:
            TypeName: Measurement
        - DataType: DOUBLE
          LogicalId: SitewiseModelinFlowMA
          Name: inFlowMilliAmps
          Unit: 'mA'
          Type:
            TypeName: Measurement
        - DataType: DOUBLE
          LogicalId: SitewiseModeloutFlowMA
          Name: outFlowMilliAmps
          Unit: 'mA'
          Type:
            TypeName: Measurement
        - DataType: DOUBLE
          LogicalId: TankLevelPercentTransform
          Name: tankLevelPercent
          Unit: '%'
          Type:
            TypeName: Transform
            Transform:
              Expression: ' 100 / 3657.6 * ( 3657.6 - tanklevelvalue )'
              Variables:
                - Name: tanklevelvalue
                  Value:
                    PropertyLogicalId: SitewiseModelTankLevel
        - DataType: DOUBLE
          LogicalId: TankLevelFeetTransform
          Name: tankLevelFeet
          Unit: 'ft'
          Type:
            TypeName: Transform
            Transform:
              Expression: ' 1 / 305 * ( tanklevelvalue )'
              Variables:
                - Name: tanklevelvalue
                  Value:
                    PropertyLogicalId: SitewiseModelTankLevel
        - DataType: DOUBLE
          LogicalId: WellFlowGPMTransform
          Name: wellFlowGPM
          Unit: 'GPM'
          Type:
            TypeName: Transform
            Transform:
              Expression: ' if(gt(wellflowma,5),((wellflowma - 4) / (( 20 - 4 ) / ( 20 - 0))),0)'
              Variables:
                - Name: wellflowma
                  Value:
                    PropertyLogicalId: SitewiseModelinFlowMA
                    PropertyLogicalId: SitewiseModelTankLevel
        - DataType: DOUBLE
          LogicalId: OutFlowGPMTransform
          Name: outFlowGPM
          Unit: 'GPM'
          Type:
            TypeName: Transform
            Transform:
              Expression: ' if(gt(outflowma,5),((outflowma - 4) / (( 20 - 4 ) / ( 20 - 0))),0)'
              Variables:
                - Name: outflowma
                  Value:
                    PropertyLogicalId: SitewiseModeloutFlowMA
  SitewiseAssetInstance:
    Type: AWS::IoTSiteWise::Asset
    Properties: 
      AssetDescription: Instance of my water monitoring Asset
      AssetModelId: !Ref WaterMonitoringModel
      AssetName: Water Monitoring
      AssetProperties:
        - Alias: 'tank_level_mm'
          LogicalId: SitewiseModelTankLevel
        - Alias: 'tank_in_flow_ma'
          LogicalId: SitewiseModelinFlowMA
        - Alias: 'tank_out_flow_ma'
          LogicalId: SitewiseModeloutFlowMA
        - Alias: 'ambient_temperature'
          LogicalId: SitewiseModelAmbTemp

With the sitewise model created I still needed a mechanism for piping data from IoT into Sitewise. I was able to accomplish this by creating a rule in IoT that would listen on a topic and then invoke a Lambda to publish data to Sitewise using the measurement aliases we created above. The Lambda will need to leverage the data to determine what signal has published a new message. It accomplishes this by looking at the payload and looking for the correct measurement values. A total of 3 lambda functions are leveraged for the full solution. A description of what they do are below:

Lambda Description Triggers
Sitewise Publisher ETL for sensor payload Sensor Publish on topic
Sitewise Alarm Engine Alarm condition monitor EventBridge (Every 10 minutes)
Current ambient temperature Google temperature scraper for local conditions EventBridge (Every 10 minutes)

Here is sample code for the Sitewise Publisher. This lambda will be configured to run from IoT event rule. Configure the Alias ID’s as environment inputs and let lambda assume a IoTSitewise role.

import json
import math
import boto3
import os

#sitewise alias id's are defined as environmental variables in lambda
def lambda_handler(event, context):
    mytime = time.time()
    mysec = math.floor(mytime)
    entryID = 'Entry_' + str(mysec)
    propValList=[]
    propValDict={}

    if 'mA_1' in event['data']: #flow #1
        doubVal=event['data']['mA_1']
        alliasID=os.environ['flow_in_alias_id']
        propValDict['value']={'doubleValue': doubVal}
        propValDict['timestamp']={'timeInSeconds': mysec}
        propValList.append(propValDict)
        write_sitewise_values(entryID,alliasID,propValList)
        propValList=[]
        propValDict={}
    if 'mA_2' in event['data']: #flow #2
        doubVal=event['data']['mA_2']
        alliasID=os.environ['flow_out_alias_id']
        propValDict['value']={'doubleValue': doubVal}
        propValDict['timestamp']={'timeInSeconds': mysec}
        propValList.append(propValDict)
        write_sitewise_values(entryID,alliasID,propValList)
        propValList=[]
        propValDict={}
    elif 'level' in event['data']: #level
        intVal = event['data']['level']
        alliasID = os.environ['level_alias_id']
        propValDict['value']={'integerValue': intVal}
        propValDict['timestamp']={'timeInSeconds': mysec}
        propValList.append(propValDict)
        if intVal != 9999: #Do not write bad value to sitewise - let alarming engine deal
            write_sitewise_values(entryID,alliasID,propValList
    return {
        'statusCode': 200,
        'body': json.dumps('Publish to Sitwise Successful!')
    }

def write_sitewise_values(entryID,alliasID,propValList):
    client=boto3.client('iotsitewise')
    response = client.batch_put_asset_property_value(
    entries=[
        {
            'entryId': entryID ,
            'propertyAlias': alliasID,
            'propertyValues': propValList
        },
    ]
    )

Once data is piped into Sitewise the service handles everything else. Just create a portal and populate meaningful trends. The alarming engine is the most critical function in this solution. As mentioned earlier in the post it is leveraging Retained Message support and using SNS to send out alarms. The basic code looks like this:

import os
import json
import boto3
import botocore
import time
import math

def lambda_handler(event, context):

    MAX_ALARM_TIME = 8 #Hours
    MAX_STALE_TIME =1 #

    userEmail = os.environ['User_Email']
    asset_Id = os.environ['asset_ID']

    #AssetAlarmProperties
    tankLevelAlarmID = os.environ['tank_level_alarm_id']
    flowInmaAlarmID = os.environ['flow_in_alarm_id']
    flowOutmaAlarmID = os.environ['flow_out_alarm_id']

    sitewisePortal = os.environ['sitewise_portal']
    snsArn = so.environ['sns_arn']

    #regex strings for alarming
    regex_level_string = 'tank_level'
    regex_flow_string = 'flow_ma'
   
    #use ioT retained messages for alarming, uses known sensor failure modes
    errorTopicList=['tank_level_mm', 'tank_in_flow_ma','tank_out_flow_ma', 'tank_level_mm_stale', 'tank_in_flow_ma_stale','tank_out_flow_stale']

    defaultPayload = {"Email_Sent": False,"Timestamp": int(round(time.time()))}

    #sitewise alarm thresholds
    propertyAliasList=[['tank_level_mm','integerValue','Low_Level_Alarm',tankLevelAlarmID],
                        ['tank_in_flow_ma','doubleValue','Flow_Meter_OOR_MilliAmp',flowInmaAlarmID],
                            ['tank_out_flow_ma','doubleValue','Flow_Meter_OOR_MilliAmp',flowOutmaAlarmID]]

    runtime_region = os.environ['AWS_REGION']

    iot_client = boto3.client('iot-data',region_name=runtime_region)
    sw_client=boto3.client('iotsitewise', region_name=runtime_region)

    #ensure retained messages exist on ioT Core/if not then create
    for item in errorTopicList: 
        try:
            response = iot_client.get_retained_message(topic=f'alarms/{item}')
        except botocore.exceptions.ClientError as error:  #repost default payload for alarm topic
            if error.response['Error']['Code'] == 'ResourceNotFoundException':
                iot_write_retained_msg(runtime_region,f'alarms/{item}',defaultPayload)
        else:
            pass

    #First check threshold based alarming - Loop through sensors to check
    for propVal in propertyAliasList:
        Valresponse = sw_client.get_asset_property_value(
            propertyAlias = str(propVal[0]).strip()
            )
    
        measured_Value=Valresponse['propertyValue']['value'][propVal[1]]
        timeStamp=Valresponse['propertyValue']['timestamp']['timeInSeconds']
    
        response = sw_client.get_asset_property_value(
            assetId=asset_Id,
            propertyId=propVal[3]
            )
        threshold = response['propertyValue']['value']['doubleValue']

        response = iot_client.get_retained_message(topic=f'alarms/{propVal[0]}')
        val=response['payload'].decode('utf-8')
        iotRmEmailState=json.loads(val)
        StaleValue = abs(timeStamp-int(iotRmEmailState['Timestamp']))
    
        #Tank level threshold check
        #Use list comprehension with regex string
        if (([v for v in propVal if regex_level_string in v]) and measured_Value > threshold):
            cw_Payload={'Sensor': str(propVal[0]),'Timestamp': timeStamp,'StaleValue': StaleValue,'Measured_Value':measured_Value, 'Threshold':threshold, 'Email_Sent': iotRmEmailState['Email_Sent']}
        
            if (iotRmEmailState['Email_Sent']==False or  StaleValue > (MAX_ALARM_TIME*60*60)):
                messageStart = f'Water level of {measured_Value} mm exceeds threshold {threshold} mm.  This is 35% of normal level.  View Sitewise page for realtime level and flow (username: {userEmail}) \n'
                subjectEmail = 'Water level is low - Please conserve'
                send_alarm_sns(str(propVal[0]),measured_Value,threshold,messageStart,subjectEmail,sitewisePortal,snsArn)
            
                #update retained message for email sent
                iot_Payload={"Email_Sent": True, "Timestamp": timeStamp}
                iot_write_retained_msg(runtime_region,f'alarms/{propVal[0]}',iot_Payload)
            
                print(f'Alarm is active, Email has been sent: {cw_Payload}') #write to cloudwatch, alarm is silcenced but exceeds threshold     
        
            else: #Tank level in alarm, currently alarm is silenced
                print(f'Alarm is active, Email is silcenced: {cw_Payload}') #write to cloudwatch, alarm is silcenced but exceeds threshold
    
        #Check if 4-20mA sensors fail range
        #Currently checking low current range
        #Use list comprehension with regex string
        if (([v for v in propVal if regex_flow_string in v]) and measured_Value < threshold):
            cw_Payload={'Sensor': str(propVal[0]),'Timestamp': timeStamp,'StaleValue': StaleValue,'Measured_Value':measured_Value, 'Threshold':threshold,'Email_Sent': iotRmEmailState['Email_Sent']}
        
            if (iotRmEmailState['Email_Sent']==False or  StaleValue > (MAX_ALARM_TIME*60*60)):
                messageStart = f'Flow sensor {propVal[0]} current of {measured_Value} mA is below threshold {threshold} mA.  View Sitewise page for realtime level and flow (username: {userEmail}) \n'
                subjectEmail = f'Water flow sensor {propval[0]} out of range - Check immeditately'
                send_alarm_sns(str(propVal[0]),measured_Value,threshold,messageStart,subjectEmail,sitewisePortal,snsArn)
            
                #update retained message for email sent
                iot_Payload={"Email_Sent": True, "Timestamp": timeStamp}
                iot_write_retained_msg(runtime_region,f'alarms/{propVal[0]}',iot_Payload)

                print(f'Alarm is active, Email has been sent: {cw_Payload}') #write to cloudwatch, alarm is silcenced but is out of range

            else: #flow sensor is in alarm, currently alarm is silenced
                print(f'Alarm is active, Email is silcenced: {cw_Payload}') #write to cloudwatch, alarm is silcenced but exceeds threshold

    #now check stale values for last 60 minutes
    myCurtime = time.time()
    myCursec = math.floor(myCurtime)
    myLastsec = myCursec - (MAX_STALE_TIME*60*60)
    for propVal in propertyAliasList:
        wellResponse = sw_client.get_asset_property_value_history(
        propertyAlias= str(propVal[0]).strip(),
        startDate=myLastsec,
        endDate=myCursec 
    )
    
        histvalList=[]
        for val in wellResponse['assetPropertyValueHistory']:
            if propVal[0] =='tank_level_mm':
                histvalList.append(val['value']['integerValue'])
            else:
                histvalList.append(val['value']['doubleValue'])
    
        isStaleData, isStaleDataMsg=check_stale_values(histvalList,(MAX_STALE_TIME*60*60))
        
        if isStaleData == False:
            cw_Payload={'Sensor': str(propVal[0]),'Timestamp': myCurtime,'StaleTime': myLastsec,'HistValList': histvalList}
        
    return {
        'statusCode': 200,
        'body': json.dumps('Neighborhood Water Monitor Completed!')
    }

#helper function to check if values are stale or missing
def check_stale_values(histvalList,timeRange):
    if len(histvalList) < 0:
        return True, f'No historical values exist for range {timeRange} seconds - no sitewise data'
    elif all(ele == histvalList[0] for ele in histvalList):
        return True, f'Sitewise is using stale data for range {timeRange} seconds'
    else:
        return False, 'Data is not stale'

#helper function updates retained message
def iot_write_retained_msg(runtime_region,topic_item,payloadVal):
    iot_client = boto3.client('iot-data',region_name=runtime_region)
    response = iot_client.publish(
                topic=topic_item,
                qos=1,
                retain=True,
                payload=json.dumps(payloadVal)
            )

#Send SNS
def send_alarm_sns(propVal,measured_Value, threshold,messageStart,subjectEmail,sitewisePortal,snsArn):  
    message =messageStart
    message+=f"Property Value: {propVal} \n"
    message+=f"Measured Value of {measured_Value} is above the threshold of {threshold} milimeters \n"
    message+="\n"
    message+=sitewisePortal
    client = boto3.client('sns')
    response = client.publish(
        TargetArn=snsArn,
        Message=message,
        MessageStructure='text',
        Subject=subjectEmail,
    )
    return {
        'statusCode': 200,
        'body': json.dumps('Email Sent')
    }

The last fun part of the solution is providing a localized temperature without an actual sensor value. For this, I just scrape google for the localized temperature for my city and then publish to Sitewise. This allows us to track water usage as a function of temperature over time.

import json
import bs4
import requests
import boto3
import time
import math

def lambda_handler(event, context):
    url='https://www.google.com/search?q=san+francisco+current+temperature'
    html=requests.get(url).content
    soup=bs4.BeautifulSoup(html,'html.parser')
    full_temp=soup.find('div', attrs={'class': 'BNeawe iBp4i AP7Wnd'}).text
    makeTempStr=''

    mytime = time.time()
    mysec = math.floor(mytime)
    entryID = 'Entry_' + str(mysec)
    propValList=[]
    propValDict={}

    client=boto3.client('iotsitewise')

    for val in full_temp:
        if not ord(val)==176:
            makeTempStr+=val
        else:
            break
    try:
        makeTempInt=int(makeTempStr)
    except:
        print('error')
    finally:
        intValue=makeTempInt
        alliasID='ambient_temperature'
        propValDict['value']={'integerValue': intValue}
        propValDict['timestamp']={'timeInSeconds': mysec}
        propValList.append(propValDict)
        write_sitewise_values(entryID,alliasID,propValList)
        propValList=[]
        propValDict={}

    return {
        'statusCode': 200,
        'body': json.dumps('Temperaure Retrieved!')
    }

def write_sitewise_values(entryID,alliasID,propValList):
    client=boto3.client('iotsitewise')

    response = client.batch_put_asset_property_value(
    entries=[
        {
            'entryId': entryID ,
            'propertyAlias': alliasID,
            'propertyValues': propValList
        },
    ]
    )

The only remaining tasks to do is to build your SNS notification topic and provide the correct permissions for the alarming lambda to send. Would love to see any comments on the solution above and potential improvements.

comments powered by Disqus