Skip to content

Measurement

This example example shows how to:

  1. Start a measurement
  2. Read out predictions
  3. Stop the measurement

Note

For brevity, this and other examples do not include error handling. This will be discussed separately in the errors subsection.

Measurement
import asyncio
import json

from asyncua import Client

url = "opc.tcp://10.0.0.2:4840/freeopcua/server/"
name = "example-start-measurement"


class SubHandler:
    async def datachange_notification(self, node, val, data):
        # skip initial value
        if not val:
            return

        # NOTE: the predictions are returned as a list in the order of the components specified when starting the measurement
        #       this corresponds to the order of the GetComponents method
        protein, glucose = val

        print(f"Current concentration prediction: {protein=:.4f} {glucose=:.4f} g/L")


async def main():
    async with Client(url=url) as client:
        await client.nodes.objects.call_method(
            "2:Monipa/2:Measurement/2:Start",
            # NOTE: the measurement configuration is passed as JSON string
            json.dumps(
                {
                    # the filename under which the measurement data will be stored
                    "name": name,
                    # which component to analyze, we can choose multiple components as well
                    "components": ["protein", "glucose"],
                    # set resolution to 4 cm^-1
                    "spectrometer_resolution": 400,
                    # how long we collect data for each data point in seconds
                    "spectrometer_measurement_time": 10,
                    # we only use one spectrum for blanking to speed up the example, it is recommended to leave this at 'default'
                    "blanking_mode": "first_spectrum",
                }
            ),
        )

        components = await client.nodes.objects.call_method(
            "2:Monipa/2:Measurement/2:GetComponents",
        )
        print(f"Started measuring components: {components}")

        node = await client.nodes.objects.get_child(
            "2:Monipa/2:Measurement/2:Predictions"
        )
        handler = SubHandler()

        # NOTE: we create a subscription with 500 ms sampling interval, the actual update rate might be lower
        subscription = await client.create_subscription(500, handler)
        await subscription.subscribe_data_change(node)

        await asyncio.sleep(30)
        await client.nodes.objects.call_method("2:Monipa/2:Measurement/2:Stop")

        # make sure we also print the last data update
        await asyncio.sleep(5)

        print("Measurement stopped.")


if __name__ == "__main__":
    asyncio.run(main())