Measurement (External)
This example example shows how to:
- Start a measurement without built-in predictions
- Subscribe to absorbance spectrum changes
- Write a prediction back to the device
- Stop the measurement
The addition of external predictions allows you to use your own machine learning models or analysis algorithms while still leveraging the Monipa 1 to handle data acquisition.
| Measurement with external predictions |
|---|
| import asyncio
import json
import time
from asyncua import Client
url = "opc.tcp://localhost:4840/freeopcua/server/"
name = f"example-start-measurement{time.time()}"
class SubHandler:
def __init__(self, client):
self.client: Client = client
self.n_samples = 0
async def datachange_notification(self, node, spectrum, data):
# skip initial value
if not spectrum:
return
# NOTE: skipping the first spectrum is not required if you did not start any measurements before
# or in case you are using a backend version >= 4.5.2
if self.n_samples == 0:
print("Clearing initial spectrum from buffer...")
self.n_samples += 1
return
try:
await self.client.nodes.objects.call_method(
"2:Monipa/2:MeasurementsExternal/2:UpdatePredictions",
name,
self.n_samples - 1,
json.dumps({"fake": sum(spectrum) / len(spectrum)}),
)
print(f"Updated predictions for sample {self.n_samples - 1}")
except Exception:
import traceback
print("Failed to update predictions:")
print(traceback.format_exc())
self.n_samples += 1
async def main():
async with Client(url=url) as client:
await client.nodes.objects.call_method(
"2:Monipa/2:Measurement/2:Start",
json.dumps(
{
"name": name,
"analytics_type": "no_analytics",
"spectrometer_resolution": 400,
"spectrometer_measurement_time": 10,
"blanking_mode": "first_spectrum",
}
),
)
node = await client.nodes.objects.get_child(
"2:Monipa/2:Measurement/2:SpectraProcessedAbsorbance",
)
handler = SubHandler(client)
# NOTE: we create a subscription with 500 ms sampling interval, the actual update rate might be lower
subscription = await client.create_subscription(500, handler)
await subscription.subscribe_data_change(node)
await asyncio.sleep(30)
await client.nodes.objects.call_method("2:Monipa/2:Measurement/2:Stop")
# make sure we also print the last data update
await asyncio.sleep(5)
print("Measurement stopped.")
if __name__ == "__main__":
asyncio.run(main())
|