Skip to content

Measurement (External)

This example example shows how to:

  1. Start a measurement without built-in predictions
  2. Subscribe to absorbance spectrum changes
  3. Write a prediction back to the device
  4. Stop the measurement

The addition of external predictions allows you to use your own machine learning models or analysis algorithms while still leveraging the Monipa 1 to handle data acquisition.

Measurement with external predictions
import asyncio
import json
import time

from asyncua import Client

url = "opc.tcp://localhost:4840/freeopcua/server/"
name = f"example-start-measurement{time.time()}"


class SubHandler:
    def __init__(self, client):
        self.client: Client = client
        self.n_samples = 0

    async def datachange_notification(self, node, spectrum, data):
        # skip initial value
        if not spectrum:
            return

        # NOTE: skipping the first spectrum is not required if you did not start any measurements before
        #       or in case you are using a backend version >= 4.5.2
        if self.n_samples == 0:
            print("Clearing initial spectrum from buffer...")
            self.n_samples += 1
            return

        try:
            await self.client.nodes.objects.call_method(
                "2:Monipa/2:MeasurementsExternal/2:UpdatePredictions",
                name,
                self.n_samples - 1,
                json.dumps({"fake": sum(spectrum) / len(spectrum)}),
            )
            print(f"Updated predictions for sample {self.n_samples - 1}")

        except Exception:
            import traceback

            print("Failed to update predictions:")
            print(traceback.format_exc())

        self.n_samples += 1


async def main():
    async with Client(url=url) as client:
        await client.nodes.objects.call_method(
            "2:Monipa/2:Measurement/2:Start",
            json.dumps(
                {
                    "name": name,
                    "analytics_type": "no_analytics",
                    "spectrometer_resolution": 400,
                    "spectrometer_measurement_time": 10,
                    "blanking_mode": "first_spectrum",
                }
            ),
        )

        node = await client.nodes.objects.get_child(
            "2:Monipa/2:Measurement/2:SpectraProcessedAbsorbance",
        )
        handler = SubHandler(client)

        # NOTE: we create a subscription with 500 ms sampling interval, the actual update rate might be lower
        subscription = await client.create_subscription(500, handler)
        await subscription.subscribe_data_change(node)

        await asyncio.sleep(30)
        await client.nodes.objects.call_method("2:Monipa/2:Measurement/2:Stop")

        # make sure we also print the last data update
        await asyncio.sleep(5)

        print("Measurement stopped.")


if __name__ == "__main__":
    asyncio.run(main())