Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
grzegorz-roboflow committed Nov 28, 2024
1 parent 7269b9a commit bc67eab
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 16 deletions.
2 changes: 2 additions & 0 deletions inference/core/workflows/core_steps/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@
EmailNotificationBlockV1,
)
from inference.core.workflows.core_steps.sinks.local_file.v1 import LocalFileSinkBlockV1
from inference.core.workflows.core_steps.sinks.opc_writer.v1 import OPCWriterSinkBlockV1
from inference.core.workflows.core_steps.sinks.roboflow.custom_metadata.v1 import (
RoboflowCustomMetadataBlockV1,
)
Expand Down Expand Up @@ -540,6 +541,7 @@ def load_blocks() -> List[Type[WorkflowBlock]]:
DataAggregatorBlockV1,
CSVFormatterBlockV1,
EmailNotificationBlockV1,
OPCWriterSinkBlockV1,
LocalFileSinkBlockV1,
TraceVisualizationBlockV1,
ReferencePathVisualizationBlockV1,
Expand Down
24 changes: 8 additions & 16 deletions inference/core/workflows/core_steps/sinks/opc_writer/v1.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
import logging
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from datetime import datetime
from functools import partial
from typing import Any, Dict, List, Literal, Optional, Tuple, Type, Union
from typing import List, Literal, Optional, Tuple, Type, Union

from asyncua.client import Client as AsyncClient
from asyncua.sync import Client, sync_async_client_method
from asyncua.ua.uaerrors import BadNoMatch, BadTypeMismatch, BadUserAccessDenied
from fastapi import BackgroundTasks
from pydantic import ConfigDict, Field

from inference.core.workflows.core_steps.common.query_language.entities.operations import (
AllOperationsType,
)
from inference.core.workflows.core_steps.common.query_language.operations.core import (
build_operations_chain,
)
from inference.core.workflows.execution_engine.entities.base import OutputDefinition
from inference.core.workflows.execution_engine.entities.types import (
BOOLEAN_KIND,
Expand Down Expand Up @@ -105,7 +98,7 @@ class BlockManifest(WorkflowBlockManifest):
json_schema_extra={
"name": "OPC Writer Sink",
"version": "v1",
"short_description": "Pushes data to OPC server",
"short_description": "Pushes data to OPC server, this block is making use of [asyncua](https://github.com/FreeOpcUa/opcua-asyncio)",
"long_description": LONG_DESCRIPTION,
"license": "Apache-2.0",
"block_type": "sink",
Expand All @@ -126,10 +119,6 @@ class BlockManifest(WorkflowBlockManifest):
{"namespace": "http://examples.freeopcua.github.io"},
],
)
object_name: Union[Selector(kind=[STRING_KIND]), str] = Field(
description="Name of object to be searched in namespace",
examples=[{"object_name": "$inputs.opc_object_name"}, {"object_name": "Line1"}],
)
user_name: Union[Selector(kind=[STRING_KIND]), Optional[str]] = Field(
default=None,
description="Optional user name to be used for authentication when connecting to OPC server",
Expand All @@ -140,6 +129,10 @@ class BlockManifest(WorkflowBlockManifest):
description="Optional password to be used for authentication when connecting to OPC server",
examples=[{"password": "$inputs.opc_password"}, {"password": "secret"}],
)
object_name: Union[Selector(kind=[STRING_KIND]), str] = Field(
description="Name of object to be searched in namespace",
examples=[{"object_name": "$inputs.opc_object_name"}, {"object_name": "Line1"}],
)
variable_name: Union[Selector(kind=[STRING_KIND]), str] = Field(
description="Name of variable to be set under found object",
examples=[
Expand All @@ -148,7 +141,7 @@ class BlockManifest(WorkflowBlockManifest):
],
)
value: Union[
Selector(kind=Union[BOOLEAN_KIND, FLOAT_KIND, INTEGER_KIND, STRING_KIND]),
Selector(kind=[BOOLEAN_KIND, FLOAT_KIND, INTEGER_KIND, STRING_KIND]),
Union[bool, float, int, str],
] = Field(
description="value to be written into variable",
Expand Down Expand Up @@ -296,7 +289,7 @@ def opc_connect_and_write_value(
timeout: int,
) -> Tuple[bool, str]:
try:
success = _opc_connect_and_write_value(
_opc_connect_and_write_value(
url=url,
namespace=namespace,
user_name=user_name,
Expand Down Expand Up @@ -343,7 +336,6 @@ def _opc_connect_and_write_value(
client
)

# Find the namespace index
try:
nsidx = get_namespace_index(namespace)
except ValueError as exc:
Expand Down
1 change: 1 addition & 0 deletions requirements/_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
aiortc>=1.9.0,<2.0.0
APScheduler>=3.10.1,<4.0.0
asyncua<=1.1.5
cython~=3.0.0
python-dotenv~=1.0.0
fastapi>=0.100,<0.111
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
import asyncio
import threading
import time
from typing import Optional, Union

from asyncua import Server
from asyncua.client import Client as AsyncClient
from asyncua.server.users import UserRole, User
from asyncua.sync import Client, sync_async_client_method
from asyncua.ua.uaerrors import BadNoMatch, BadTypeMismatch, BadUserAccessDenied

from inference.core.env import WORKFLOWS_MAX_CONCURRENT_STEPS
from inference.core.workflows.execution_engine.core import ExecutionEngine
from tests.workflows.integration_tests.execution.workflows_gallery_collector.decorators import (
add_to_workflows_gallery,
)

WORKFLOW_OPC_WRITER = {
"version": "1.0",
"inputs": [
{"type": "InferenceParameter", "name": "opc_url"},
{"type": "InferenceParameter", "name": "opc_namespace"},
{"type": "InferenceParameter", "name": "opc_user_name"},
{"type": "InferenceParameter", "name": "opc_password"},
{"type": "InferenceParameter", "name": "opc_object_name"},
{"type": "InferenceParameter", "name": "opc_variable_name"},
{"type": "InferenceParameter", "name": "opc_value"},
],
"steps": [
{
"type": "roboflow_core/opc_writer_sink@v1",
"name": "opc_writer",
"url": "$inputs.opc_url",
"namespace": "$inputs.opc_namespace",
"user_name": "$inputs.opc_user_name",
"password": "$inputs.opc_password",
"object_name": "$inputs.opc_object_name",
"variable_name": "$inputs.opc_variable_name",
"value": "$inputs.opc_value",
"fire_and_forget": False
}
],
"outputs": [
{
"type": "JsonField",
"name": "opc_writer_results",
"selector": "$steps.opc_writer.*",
}
],
}


OPC_SERVER_STARTED = False
STOP_OPC_SERVER = False


def start_loop(loop: asyncio.AbstractEventLoop):
asyncio.set_event_loop(loop)
loop.run_forever()


users_db = {"user1": "password1"}


class UserManager:
def get_user(self, iserver, username=None, password=None, certificate=None):
if username in users_db and password == users_db[username]:
return User(role=UserRole.User)
return None


async def start_test_opc_server(
url: str,
namespace: str,
object_name: str,
variable_name: str,
initial_value: float,
):
global OPC_SERVER_STARTED
global STOP_OPC_SERVER
server = Server(user_manager=UserManager())
await server.init()
server.set_endpoint(url)

uri = namespace
idx = await server.register_namespace(uri)

myobj = await server.nodes.objects.add_object(idx, object_name)
myvar = await myobj.add_variable(idx, variable_name, initial_value)
await myvar.set_writable()
OPC_SERVER_STARTED = True
async with server:
while not STOP_OPC_SERVER:
await asyncio.sleep(1)


def _opc_connect_and_read_value(
url: str,
namespace: str,
user_name: Optional[str],
password: Optional[str],
object_name: str,
variable_name: str,
timeout: int,
) -> Union[bool, float, int, str]:
client = Client(url=url, sync_wrapper_timeout=timeout)
if user_name and password:
client.set_user(user_name)
client.set_password(password)
try:
client.connect()
except BadUserAccessDenied as exc:
client.disconnect()
raise Exception(f"AUTH ERROR: {exc}")
except OSError as exc:
client.disconnect()
raise Exception(f"NETWORK ERROR: {exc}")
except Exception as exc:
client.disconnect()
raise Exception(f"UNHANDLED ERROR: {type(exc)} {exc}")
get_namespace_index = sync_async_client_method(AsyncClient.get_namespace_index)(
client
)

try:
nsidx = get_namespace_index(namespace)
except ValueError as exc:
client.disconnect()
raise Exception(f"WRONG NAMESPACE ERROR: {exc}")
except Exception as exc:
client.disconnect()
raise Exception(f"UNHANDLED ERROR: {type(exc)} {exc}")

try:
var = client.nodes.root.get_child(
f"0:Objects/{nsidx}:{object_name}/{nsidx}:{variable_name}"
)
except BadNoMatch as exc:
client.disconnect()
raise Exception(f"WRONG OBJECT OR PROPERTY ERROR: {exc}")
except Exception as exc:
client.disconnect()
raise Exception(f"UNHANDLED ERROR: {type(exc)} {exc}")

try:
value = var.read_value()
except BadTypeMismatch as exc:
client.disconnect()
raise Exception(f"WRONG TYPE ERROR: {exc}")
except Exception as exc:
client.disconnect()
raise Exception(f"UNHANDLED ERROR: {type(exc)} {exc}")

client.disconnect()
return value


@add_to_workflows_gallery(
category="Basic Workflows",
use_case_title="Workflow writing data to OPC server",
use_case_description="""
In this example data is written to OPC server.
In order to write to OPC this block is making use of [asyncua](https://github.com/FreeOpcUa/opcua-asyncio) package.
Writing to OPC enables workflows to expose insights extracted from camera to PLC controllers
allowing factory automation engineers to take advantage of machine vision when building PLC logic.
""",
workflow_definition=WORKFLOW_OPC_WRITER,
workflow_name_in_app="opc_writer",
)
def test_workflow_with_classical_pattern_matching() -> None:
# given
loop = asyncio.new_event_loop()
t = threading.Thread(target=start_loop, args=(loop,), daemon=True)
t.start()

opc_url = "opc.tcp://localhost:4840/freeopcua/server/"
opc_namespace = "http://examples.freeopcua.github.io"
opc_user_name = "user1"
opc_password = users_db[opc_user_name]
opc_object_name = "MyObject1"
opc_variable_name = "MyVariable1"
opc_initial_value = 1

asyncio.run_coroutine_threadsafe(
start_test_opc_server(
url=opc_url,
namespace=opc_namespace,
object_name=opc_object_name,
variable_name=opc_variable_name,
initial_value=opc_initial_value,
),
loop,
)

execution_engine = ExecutionEngine.init(
workflow_definition=WORKFLOW_OPC_WRITER,
init_parameters={},
max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS,
)

while not OPC_SERVER_STARTED:
time.sleep(0.1)

# when
result = execution_engine.run(
runtime_parameters={
"opc_url": opc_url,
"opc_namespace": opc_namespace,
"opc_user_name": opc_user_name,
"opc_password": opc_password,
"opc_object_name": opc_object_name,
"opc_variable_name": opc_variable_name,
"opc_value": 41,
}
)

result_value = _opc_connect_and_read_value(
url=opc_url,
namespace=opc_namespace,
user_name=opc_user_name,
password=opc_password,
object_name=opc_object_name,
variable_name=opc_variable_name,
timeout=1,
)

STOP_OPC_SERVER = True
loop.stop()

assert set(result[0].keys()) == {
"opc_writer_results",
}, "Expected all declared outputs to be delivered"
assert result[0]["opc_writer_results"]["error_status"] == False
assert result[0]["opc_writer_results"]["disabled"] == False
assert result[0]["opc_writer_results"]["throttling_status"] == False
assert result[0]["opc_writer_results"]["message"] == "Value set successfully"
assert result_value == 41

0 comments on commit bc67eab

Please sign in to comment.