Measurement
This example example shows how to:
- Start a measurement
- Read out predictions
- Stop the measurement
Note
For brevity, this and other examples do not include error handling. This will be discussed separately in the errors subsection.
| Measurement |
|---|
| import asyncio
import json
from asyncua import Client
url = "opc.tcp://10.0.0.2:4840/freeopcua/server/"
name = "example-start-measurement"
class SubHandler:
async def datachange_notification(self, node, val, data):
# skip initial value
if not val:
return
# NOTE: the predictions are returned as a list in the order of the components specified when starting the measurement
# this corresponds to the order of the GetComponents method
protein, glucose = val
print(f"Current concentration prediction: {protein=:.4f} {glucose=:.4f} g/L")
async def main():
async with Client(url=url) as client:
await client.nodes.objects.call_method(
"2:Monipa/2:Measurement/2:Start",
# NOTE: the measurement configuration is passed as JSON string
json.dumps(
{
# the filename under which the measurement data will be stored
"name": name,
# which component to analyze, we can choose multiple components as well
"components": ["protein", "glucose"],
# set resolution to 4 cm^-1
"spectrometer_resolution": 400,
# how long we collect data for each data point in seconds
"spectrometer_measurement_time": 10,
# we only use one spectrum for blanking to speed up the example, it is recommended to leave this at 'default'
"blanking_mode": "first_spectrum",
}
),
)
components = await client.nodes.objects.call_method(
"2:Monipa/2:Measurement/2:GetComponents",
)
print(f"Started measuring components: {components}")
node = await client.nodes.objects.get_child(
"2:Monipa/2:Measurement/2:Predictions"
)
handler = SubHandler()
# NOTE: we create a subscription with 500 ms sampling interval, the actual update rate might be lower
subscription = await client.create_subscription(500, handler)
await subscription.subscribe_data_change(node)
await asyncio.sleep(30)
await client.nodes.objects.call_method("2:Monipa/2:Measurement/2:Stop")
# make sure we also print the last data update
await asyncio.sleep(5)
print("Measurement stopped.")
if __name__ == "__main__":
asyncio.run(main())
|