diff --git a/.gitignore b/.gitignore
index 567609b..315b756 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,2 @@
+*.gcov
build/
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2f0d013..94985a4 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,4 +1,4 @@
-cmake_minimum_required(VERSION 3.0)
+cmake_minimum_required(VERSION 3.18.1)
project(sysrepo-gnxi)
@@ -12,8 +12,11 @@ set(SYSREPO-GNXI_MICRO_VERSION 0)
set(SYSREPO-GNXI_VERSION ${SYSREPO-GNXI_MAJOR_VERSION}.${SYSREPO-GNXI_MINOR_VERSION}.${SYSREPO-GNXI_MICRO_VERSION})
-#C++11 is required by gNXI
-set(CMAKE_CXX_STANDARD 11)
+#C++11 is required by gNXI, we need c++20 for other stuff
+set(CMAKE_CXX_STANDARD 20)
+set(CMAKE_CXX_STANDARD_REQUIRED ON)
+include(GNUInstallDirs)
+include(CheckIncludeFileCXX)
# DEPENDENCIES
##############
@@ -23,20 +26,36 @@ list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/CmakeModules")
find_package(PkgConfig) #official cmake module
find_package(Boost REQUIRED log system) #just boost-log and boost-system libraries
-
-pkg_check_modules(JSONCPP REQUIRED jsoncpp) #official pkgconfig jsoncpp
+find_package(Catch2 CONFIG REQUIRED)
+message(STATUS "Catch2 found (version ${Catch2_VERSION})")
+LIST(APPEND CMAKE_REQUIRED_INCLUDES "/usr/include/catch2")
+if(Catch2_VERSION_MAJOR EQUAL 3)
+ check_include_file_cxx("catch_all.hpp" CATCH_HEADER_FOUND)
+ add_definitions(-DUSE_CATCH_ALL)
+ set(CATCH2_LIBRARY Catch2)
+else()
+ check_include_file_cxx("catch.hpp" CATCH_HEADER_FOUND)
+endif()
+if(NOT CATCH_HEADER_FOUND)
+ message(FATAL_ERROR "No usable Catch2 header found")
+endif()
+
+pkg_check_modules(SYSREPOC REQUIRED sysrepo>=2.2.14 IMPORTED_TARGET)
+
+pkg_check_modules(LIBYANGC REQUIRED libyang)
+pkg_check_modules(SYSREPO REQUIRED sysrepo-cpp)
pkg_check_modules(LIBYANG REQUIRED libyang-cpp)
-pkg_check_modules(SYSREPO REQUIRED libSysrepo-cpp=>0.7.7) #PkgConfig cmake module maccro
+include(CodeCoverage)
# DEPENDENCIES & COMPILATION OF GNXI PROTO
##########################################
# set compiler and linker flags
set(RIGOROUS_C_FLAGS "-Wlogical-op -Wold-style-cast")
-set(CMAKE_CXX_FLAGS "-Wall -Wextra -std=c++11 -g ${RIGOROUS_C_FLAGS}")
-set(CMAKE_CXX_FLAGS_RELEASE "-Wall -Wextra -std=c++11 -DNDEBUG -O2 ${RIGOROUS_C_FLAGS}")
-set(CMAKE_CXX_FLAGS_DEBUG "-Wall -Wextra -std=c++11 -g -O0 ${RIGOROUS_C_FLAGS}")
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wno-deprecated-declarations -std=c++20 -g ${RIGOROUS_C_FLAGS}")
+set(CMAKE_CXX_FLAGS_RELEASE "-DNDEBUG -O2")
+set(CMAKE_CXX_FLAGS_DEBUG "-O0")
#Use Boost with Dynamic libraries rather than static linkage
add_definitions(-D BOOST_LOG_DYN_LINK)
@@ -52,27 +71,32 @@ add_subdirectory(proto)
# Generate a compile_commands.json with compile options
set(CMAKE_EXPORT_COMPILE_COMMANDS 1)
-set(GNXI_SRC src/main.cpp
- src/security/authentication.cpp
+set(GNXI_SRC src/security/authentication.cpp
src/utils/log.cpp
+ src/utils/sysrepo.cpp
src/gnmi/gnmi.cpp
+ src/gnmi/gnmi_server.cpp
src/gnmi/capabilities.cpp
src/gnmi/get.cpp
+ src/gnmi/rpc.cpp
src/gnmi/set.cpp
src/gnmi/subscribe.cpp
+ src/gnmi/confirm.cpp
src/gnmi/encode/encode.cpp
- src/gnmi/encode/load_models.cpp
- src/gnmi/encode/runtime.cpp
src/gnmi/encode/json_ietf.cpp
)
-add_executable(gnxi_server ${GNXI_SRC})
+add_executable(gnxi_server ${GNXI_SRC} src/main.cpp)
+
+target_compile_definitions(gnxi_server PRIVATE
+ GNMI_LOGROTATE_SCRIPT="/usr/libexec/gnxi-server-logrotate"
+ GNMI_LOG_DIR="/var/log/gnxi-server"
+)
#Header file location required to build target
target_include_directories(gnxi_server
PUBLIC #List of include dirs required to use target binary or library
${Boost_INCLUDE_DIRS}
- ${JSONCPP_INCLUDE_DIRS}
${LIBYANG_INCLUDE_DIRS}
${SYSREPO_INCLUDE_DIRS}
${PROTOBUF_INCLUDE_DIR}
@@ -82,51 +106,88 @@ target_include_directories(gnxi_server
)
#Directory path to look for libraries
-link_directories(${Boost_LIBRARY_DIRS})
+link_directories(
+ ${Boost_LIBRARY_DIRS}
+ ${SYSREPO_LIBRARY_DIRS}
+ ${SYSREPOC_LIBRARY_DIRS}
+ ${LIBYANG_LIBRARY_DIRS}
+ ${LIBYANGC_LIBRARY_DIRS}
+)
-# link gnxi_server executable with grpc, jsoncpp, sysrepo libraries
+# link gnxi_server executable with grpc, sysrepo libraries
target_link_libraries(gnxi_server gnmi
- ${JSONCPP_LIBRARIES}
${Boost_LIBRARIES}
${SYSREPO_LIBRARIES}
+ ${SYSREPOC_LIBRARIES}
${LIBYANG_LIBRARIES}
+ ${LIBYANGC_LIBRARIES}
+)
+
+# TEST
+######
+
+enable_testing()
+
+set(GNXI_TEST_SRC tests/main.cpp
+ tests/capabilities.cpp
+ tests/get.cpp
+ tests/rpc.cpp
+ tests/set.cpp
+ tests/subscribe.cpp
+ )
+
+add_executable(gnxi_server_test ${GNXI_TEST_SRC} ${GNXI_SRC})
+configure_file(scripts/gnxi-server-logrotate gnxi-server-logrotate COPYONLY)
+
+set_target_properties(gnxi_server_test PROPERTIES COMPILE_FLAGS "-DTEST -O0 -fprofile-arcs -ftest-coverage")
+
+get_filename_component(TESTS_SRC_DIR "${CMAKE_SOURCE_DIR}/tests" REALPATH)
+get_filename_component(TESTS_WORKING_DIR "${CMAKE_BINARY_DIR}" REALPATH)
+target_compile_definitions(gnxi_server_test PRIVATE
+ TESTS_SRC_DIR="${TESTS_SRC_DIR}";
+ TESTS_WORKING_DIR="${TESTS_WORKING_DIR}"
+ GNMI_LOG_DIR="${TESTS_WORKING_DIR}/gnmi-logs"
+ GNMI_LOGROTATE_SCRIPT="${TESTS_WORKING_DIR}/gnxi-server-logrotate"
)
+add_test(NAME gnxi_server_test COMMAND ${CMAKE_BINARY_DIR}/gnxi_server_test)
+
+# Add a timeout for tests, to prevent them from running forever
+set_tests_properties(gnxi_server_test PROPERTIES TIMEOUT 300)
+
+#Header file location required to build target
+target_include_directories(gnxi_server_test
+ PUBLIC #List of include dirs required to use target binary or library
+ ${Boost_INCLUDE_DIRS}
+ ${LIBYANG_INCLUDE_DIRS}
+ ${SYSREPO_INCLUDE_DIRS}
+ ${PROTOBUF_INCLUDE_DIR}
+ PRIVATE
+ ${CMAKE_CURRENT_BINARY_DIR} #include "build" directory tree for "build/proto"
+ ${CMAKE_CURRENT_SOURCE_DIR}/src #include "src" tree for
+)
+
+# link gnxi_server executable with grpc, sysrepo libraries
+target_link_libraries(gnxi_server_test gnmi
+ ${Boost_LIBRARIES}
+ ${SYSREPO_LIBRARIES}
+ ${SYSREPOC_LIBRARIES}
+ ${LIBYANG_LIBRARIES}
+ ${LIBYANGC_LIBRARIES}
+ ${SYSTEMD_LIBRARIES}
+ ${CATCH2_LIBRARY}
+ gcov
+)
+
+# Remove test files and auto-generated files from coverage report
+set(GCOVR_EXCLUDES "'tests/*'" "'${CMAKE_CURRENT_BINARY_DIR}/*/*'")
+setup_target_for_coverage_cobertura(gnxi_server_test_coverage ${CMAKE_CURRENT_BINARY_DIR}/gnxi_server_test coverage)
+add_dependencies(gnxi_server_test_coverage gnxi_server_test)
+
# INSTALLATION
##############
install(TARGETS gnxi_server
- RUNTIME DESTINATION bin
- DESTINATION ${CMAKE_INSTALL_BINDIR}
+ RUNTIME DESTINATION sbin
+ DESTINATION ${CMAKE_INSTALL_SBINDIR}
)
-
-# PACKAGING
-###########
-
-SET(CPACK_PACKAGE_VENDOR "FD.io")
-SET(CPACK_PACKAGE_CONTACT "ypiperea@cisco.com")
-SET(CPACK_PACKAGE_VERSION_MAJOR ${SYSREPO-GNXI_MAJOR_VERSION})
-SET(CPACK_PACKAGE_VERSION_MINOR ${SYSREPO-GNXI_MINOR_VERSION})
-SET(CPACK_PACKAGE_VERSION_PATCH ${SYSREPO-GNXI_MICRO_VERSION})
-SET(CPACK_PACKAGE_DESCRIPTION_SUMMARY "A gNMI server for sysrepo YANG datastore.")
-SET(CPACK_SOURCE_PACKAGE_FILE_NAME "https://github.com/YohanPipereau/sysrepo-gnxi")
-
-#DEBIAN specific : SET(CPACK_GENERATOR "DEB")
-if(CPACK_GENERATOR EQUAL "DEB")
- message(STATUS "DEB packaging selected")
- SET(CPACK_DEBIAN_PACKAGE_ARCHITECTURE "x86_64")
- SET(CPACK_DEBIAN_PACKAGE_SECTION "misc")
- SET(CPACK_DEBIAN_PACKAGE_PRIORITY "optional")
- SET(CPACK_DEBIAN_PACKAGE_HOMEPAGE ${CPACK_SOURCE_PACKAGE_FILE_NAME})
- SET(CPACK_DEBIAN_PACKAGE_DEPENDS "libboost-thread, libboost-log, libboost-system, libjsoncpp, libstdc++6 (>= 5.2), zlib1g, libssl, libyang-cpp0.16, sysrepo-cpp")
-endif(CPACK_GENERATOR EQUAL "DEB")
-
-#RPM specific : SET(CPACK_GENERATOR "RPM")
-if(CPACK_GENERATOR EQUAL "RPM")
- message(STATUS "RPM packaging selected")
- SET(CPACK_RPM_PACKAGE_ARCHITECTURE "x86_64")
- SET(CPACK_RPM_PACKAGE_URL ${CPACK_SOURCE_PACKAGE_FILE_NAME})
- SET(CPACK_RPM_PACKAGE_REQUIRES "boost, jsoncpp, libstdc++6 (>= 5.2), zlib, openssl-devel")
-endif(CPACK_GENERATOR EQUAL "RPM")
-
-INCLUDE(CPack) #run cpack
diff --git a/CmakeModules/CodeCoverage.cmake b/CmakeModules/CodeCoverage.cmake
new file mode 100644
index 0000000..0c86560
--- /dev/null
+++ b/CmakeModules/CodeCoverage.cmake
@@ -0,0 +1,206 @@
+# Copyright (c) 2012 - 2015, Lars Bilke
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without modification,
+# are permitted provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this
+# list of conditions and the following disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+#
+# 3. Neither the name of the copyright holder nor the names of its contributors
+# may be used to endorse or promote products derived from this software without
+# specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+#
+#
+# 2012-01-31, Lars Bilke
+# - Enable Code Coverage
+#
+# 2013-09-17, Joakim Söderberg
+# - Added support for Clang.
+# - Some additional usage instructions.
+#
+# USAGE:
+
+# 0. (Mac only) If you use Xcode 5.1 make sure to patch geninfo as described here:
+# http://stackoverflow.com/a/22404544/80480
+#
+# 1. Copy this file into your cmake modules path.
+#
+# 2. Add the following line to your CMakeLists.txt:
+# INCLUDE(CodeCoverage)
+#
+# 3. Set compiler flags to turn off optimization and enable coverage:
+# SET(CMAKE_CXX_FLAGS "-g -O0 -fprofile-arcs -ftest-coverage")
+# SET(CMAKE_C_FLAGS "-g -O0 -fprofile-arcs -ftest-coverage")
+#
+# 3. Use the function SETUP_TARGET_FOR_COVERAGE to create a custom make target
+# which runs your test executable and produces a lcov code coverage report:
+# Example:
+# SETUP_TARGET_FOR_COVERAGE(
+# my_coverage_target # Name for custom target.
+# test_driver # Name of the test driver executable that runs the tests.
+# # NOTE! This should always have a ZERO as exit code
+# # otherwise the coverage generation will not complete.
+# coverage # Name of output directory.
+# )
+#
+# If you need to exclude additional directories from the report, specify them
+# using the LCOV_REMOVE_EXTRA variable before calling SETUP_TARGET_FOR_COVERAGE.
+# For example:
+#
+# set(LCOV_REMOVE_EXTRA "'thirdparty/*'")
+#
+# 4. Build a Debug build:
+# cmake -DCMAKE_BUILD_TYPE=Debug ..
+# make
+# make my_coverage_target
+#
+#
+
+# Check prereqs
+FIND_PROGRAM( GCOV_PATH gcov )
+FIND_PROGRAM( LCOV_PATH lcov )
+FIND_PROGRAM( GENHTML_PATH genhtml )
+FIND_PROGRAM( GCOVR_PATH gcovr PATHS ${CMAKE_SOURCE_DIR}/tests)
+
+IF(NOT GCOV_PATH)
+ MESSAGE(FATAL_ERROR "gcov not found! Aborting...")
+ENDIF() # NOT GCOV_PATH
+
+IF("${CMAKE_CXX_COMPILER_ID}" MATCHES "(Apple)?[Cc]lang")
+ IF("${CMAKE_CXX_COMPILER_VERSION}" VERSION_LESS 3)
+ MESSAGE(FATAL_ERROR "Clang version must be 3.0.0 or greater! Aborting...")
+ ENDIF()
+ELSEIF(NOT CMAKE_COMPILER_IS_GNUCXX)
+ MESSAGE(FATAL_ERROR "Compiler is not GNU gcc! Aborting...")
+ENDIF() # CHECK VALID COMPILER
+
+SET(CMAKE_CXX_FLAGS_COVERAGE
+ "-g -O0 --coverage -fprofile-arcs -ftest-coverage"
+ CACHE STRING "Flags used by the C++ compiler during coverage builds."
+ FORCE )
+SET(CMAKE_C_FLAGS_COVERAGE
+ "-g -O0 --coverage -fprofile-arcs -ftest-coverage"
+ CACHE STRING "Flags used by the C compiler during coverage builds."
+ FORCE )
+SET(CMAKE_EXE_LINKER_FLAGS_COVERAGE
+ ""
+ CACHE STRING "Flags used for linking binaries during coverage builds."
+ FORCE )
+SET(CMAKE_SHARED_LINKER_FLAGS_COVERAGE
+ ""
+ CACHE STRING "Flags used by the shared libraries linker during coverage builds."
+ FORCE )
+MARK_AS_ADVANCED(
+ CMAKE_CXX_FLAGS_COVERAGE
+ CMAKE_C_FLAGS_COVERAGE
+ CMAKE_EXE_LINKER_FLAGS_COVERAGE
+ CMAKE_SHARED_LINKER_FLAGS_COVERAGE )
+
+IF ( NOT (CMAKE_BUILD_TYPE STREQUAL "Debug" OR CMAKE_BUILD_TYPE STREQUAL "Coverage"))
+ MESSAGE( WARNING "Code coverage results with an optimized (non-Debug) build may be misleading" )
+ENDIF() # NOT CMAKE_BUILD_TYPE STREQUAL "Debug"
+
+
+# Param _targetname The name of new the custom make target
+# Param _testrunner The name of the target which runs the tests.
+# MUST return ZERO always, even on errors.
+# If not, no coverage report will be created!
+# Param _outputname lcov output is generated as _outputname.info
+# HTML report is generated in _outputname/index.html
+# Optional fourth parameter is passed as arguments to _testrunner
+# Pass them in list form, e.g.: "-j;2" for -j 2
+FUNCTION(SETUP_TARGET_FOR_COVERAGE _targetname _testrunner _outputname)
+
+ IF(NOT LCOV_PATH)
+ MESSAGE(FATAL_ERROR "lcov not found! Aborting...")
+ ENDIF() # NOT LCOV_PATH
+
+ IF(NOT GENHTML_PATH)
+ MESSAGE(FATAL_ERROR "genhtml not found! Aborting...")
+ ENDIF() # NOT GENHTML_PATH
+
+ SET(coverage_info "${CMAKE_BINARY_DIR}/${_outputname}.info")
+ SET(coverage_cleaned "${coverage_info}.cleaned")
+
+ SEPARATE_ARGUMENTS(test_command UNIX_COMMAND "${_testrunner}")
+
+ # Setup target
+ ADD_CUSTOM_TARGET(${_targetname}
+
+ # Cleanup lcov
+ ${LCOV_PATH} --directory . --zerocounters
+
+ # Run tests
+ COMMAND ${test_command} ${ARGV3}
+
+ # Capturing lcov counters and generating report
+ COMMAND ${LCOV_PATH} --directory . --capture --output-file ${coverage_info}
+ COMMAND ${LCOV_PATH} --remove ${coverage_info} 'tests/*' '/usr/*' ${LCOV_REMOVE_EXTRA} --output-file ${coverage_cleaned}
+ COMMAND ${GENHTML_PATH} -o ${_outputname} ${coverage_cleaned}
+ COMMAND ${CMAKE_COMMAND} -E remove ${coverage_info} ${coverage_cleaned}
+
+ WORKING_DIRECTORY ${CMAKE_BINARY_DIR}
+ COMMENT "Resetting code coverage counters to zero.\nProcessing code coverage counters and generating report."
+ )
+
+ # Show info where to find the report
+ ADD_CUSTOM_COMMAND(TARGET ${_targetname} POST_BUILD
+ COMMAND ;
+ COMMENT "Open ./${_outputname}/index.html in your browser to view the coverage report."
+ )
+
+ENDFUNCTION() # SETUP_TARGET_FOR_COVERAGE
+
+# Param _targetname The name of new the custom make target
+# Param _testrunner The name of the target which runs the tests
+# Param _outputname cobertura output is generated as _outputname.xml
+# Optional fourth parameter is passed as arguments to _testrunner
+# Pass them in list form, e.g.: "-j;2" for -j 2
+FUNCTION(SETUP_TARGET_FOR_COVERAGE_COBERTURA _targetname _testrunner _outputname)
+
+ IF(NOT GCOVR_PATH)
+ MESSAGE(FATAL_ERROR "gcovr not found! Aborting...")
+ ENDIF() # NOT GCOVR_PATH
+
+ set(GCOVR_EXCLUDE_ARGS "")
+ foreach(EXCLUDE ${GCOVR_EXCLUDES})
+ list(APPEND GCOVR_EXCLUDE_ARGS "-e")
+ list(APPEND GCOVR_EXCLUDE_ARGS "${EXCLUDE}")
+ endforeach()
+
+ ADD_CUSTOM_TARGET(${_targetname}
+
+ # Run tests
+ ${_testrunner} ${ARGV3}
+
+ # Running gcovr
+ COMMAND ${GCOVR_PATH} --gcov-ignore-parse-errors -x --exclude-unreachable-branches --print-summary -r ${CMAKE_SOURCE_DIR} ${GCOVR_EXCLUDE_ARGS} -o ${_outputname}.xml
+ WORKING_DIRECTORY ${CMAKE_BINARY_DIR}
+ COMMENT "Running gcovr to produce Cobertura code coverage report."
+ )
+
+ # Show info where to find the report
+ ADD_CUSTOM_COMMAND(TARGET ${_targetname} POST_BUILD
+ COMMAND ;
+ COMMENT "Cobertura code coverage report saved in ${_outputname}.xml."
+ )
+
+ENDFUNCTION() # SETUP_TARGET_FOR_COVERAGE_COBERTURA
+
diff --git a/README.md b/README.md
index 8dfb682..2443253 100644
--- a/README.md
+++ b/README.md
@@ -10,6 +10,8 @@ Supported RPCs:
* [X] Set
* [X] Get
* [X] Subscribe
+* [X] Rpc
+* [X] Confirm
Supported encoding:
@@ -35,27 +37,20 @@ sysrepo-gnxi
+-- protobuf (>=3.0) #because of gnmi
+-- jsoncpp #because of get JSON
+-- grpc (cpp) (>=1.18.0) #because of TLS bug to verify client cert
-+-- libyang-cpp (>=1.0-r3) #because of feature_enable
-+-- sysrepo-cpp (>=0.7.7)
++-- libyang-cpp
++-- sysrepo-cpp
| +-- libyang
-| +-- ...
+| +-- sysrepo
```
You can either install dependencies from sources or from the packages.
-Install dependencies from source:
-
-1. If `libyang (>=1.0-r3)` is packaged on your distrib use it, else run `scripts/install-libyang.sh` to install the required version of libyang. _you can use an older version and apply commit bf1aa13ba2dfb7b5938ed2345a67de316fc34917 to it_
-2. You can run `scripts/install-sysrepo.sh` to install sysrepo. Check [here](https://github.com/sysrepo/sysrepo/blob/master/INSTALL.md) for installation instructions of sysrepo.
-
By default, grpc and protobuf are linked statically. But you can build it to have them linked dynamically.
# Install
## Install from package:
-Install deb and rpm from https://github.com/YohanPipereau/sysrepo-gnxi/releases
-
## Install from source:
```
diff --git a/conf/gnmi-server.env b/conf/gnmi-server.env
new file mode 100644
index 0000000..2e7572b
--- /dev/null
+++ b/conf/gnmi-server.env
@@ -0,0 +1,9 @@
+# Set to NO when we DO NOT want to see data values in logs
+GNMI_DISPLAY_DATA_LOG=YES
+
+# Uncomment to see (lots of) gRPC traces
+# GRPC_VERBOSITY=DEBUG
+# GRPC_TRACE=all
+
+# override defaults for GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH and GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH
+GNMI_MAX_MSG_SIZE_KB=65536
diff --git a/proto/CMakeLists.txt b/proto/CMakeLists.txt
index 1846144..b957c5d 100644
--- a/proto/CMakeLists.txt
+++ b/proto/CMakeLists.txt
@@ -35,7 +35,7 @@ PROTOBUF_GENERATE_GRPC_CPP(gnmi_ext_grpc_srcs gnmi_ext_grpc_hdrs ${gnmi_ext_prot
#Create a new library named gnmi
add_library(gnmi ${gnmi_grpc_srcs} ${gnmi_ext_grpc_srcs} ${gnmi_proto_srcs} ${gnmi_ext_proto_srcs})
-set(DYNAMIC_LINK_GRPC OFF)
+set(DYNAMIC_LINK_GRPC ON)
if(DYNAMIC_LINK_GRPC)
message(STATUS "DYNAMICALLY LINK gRPC")
diff --git a/proto/gnmi.proto b/proto/gnmi.proto
index 561a087..0076589 100644
--- a/proto/gnmi.proto
+++ b/proto/gnmi.proto
@@ -1,5 +1,6 @@
//
// Copyright 2016 Google Inc. All Rights Reserved.
+// Copyright 2025 Graphiant Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -64,6 +65,18 @@ service gNMI {
// (POLL), or sent as a one-off retrieval (ONCE).
// Reference: gNMI Specification Section 3.5
rpc Subscribe(stream SubscribeRequest) returns (stream SubscribeResponse);
+
+ // Extensions to gNMI specification:
+
+ // Confirm allows a client to Confirm with the target a previous Set
+ // operation. If not confirmed, the previous configuration is reapplied.
+ // This is a extension to gNMI specification.
+ rpc Confirm(ConfirmRequest) returns (ConfirmResponse);
+ // Rpc allows a client to invoke an RPC in the tree as specified by the
+ // path included in the message and serialises the response to be returned
+ // to the client using the specified encoding.
+ // This is a extension to gNMI specification.
+ rpc Rpc(RpcRequest) returns (RpcResponse);
}
// Notification is a re-usable message that is used to encode data from the
@@ -327,6 +340,12 @@ message AliasList {
repeated Alias alias = 1; // The set of aliases to be created.
}
+// Presence of this message indicates that the Set RPC needs to be confirmed
+// If Confirm RPC is not sent in time, rollback occurs on timeout
+message ConfirmParmsRequest {
+ uint32 timeout_secs = 1; // Requested timeout. See ConfirmParmsResponse for target's response
+ bool ignore_system_state = 2; // If true, do not reject Set based on SystemState
+}
// SetRequest is sent from a client to the target to update values in the data
// tree. Paths are either deleted by the client, or modified by means of being
// updated, or replaced. Where a replace is used, unspecified values are
@@ -342,8 +361,20 @@ message SetRequest {
// Extension messages associated with the SetRequest. See the
// gNMI extension specification for further definition.
repeated gnmi_ext.Extension extension = 5;
+
+ // Start of proprietary messages
+
+ // Requested Confirm parameters
+ ConfirmParmsRequest confirm = 666;
+ // Transaction-id (non-zero)
+ uint64 transaction_id = 667;
}
+// Target's response to client's request
+message ConfirmParmsResponse {
+ uint32 min_wait_secs = 1; // Client has to wait at least this long before sending Confirm RPC
+ uint32 timeout_secs = 2; // Client has this amount of time to send Confirm after Set
+}
// SetResponse is the response to a SetRequest, sent from the target to the
// client. It reports the result of the modifications to the data tree that were
// specified by the client. Errors for this RPC should be reported using the
@@ -361,6 +392,11 @@ message SetResponse {
// Extension messages associated with the SetResponse. See the
// gNMI extension specification for further definition.
repeated gnmi_ext.Extension extension = 5;
+
+ // Start of proprietary messages
+
+ // This must be present if ConfirmParmsRequest is present in the corresponding request
+ ConfirmParmsResponse confirm = 666;
}
// UpdateResult is used within the SetResponse message to communicate the
@@ -454,3 +490,32 @@ message ModelData {
string organization = 2; // Organization publishing the model.
string version = 3; // Semantic version of the model.
}
+
+// ConfirmRequest is sent by the client in the Confirm RPC to request
+// that the target confirm the last Set RPC.
+message ConfirmRequest {
+ bool ignore_system_state = 1; // If true, do not reject Confirm based on SystemState
+}
+// ConfirmResponse is used by the target to respond to a Confirm RPC
+// sent by the Client.
+message ConfirmResponse {
+}
+
+// RpcRequest is sent by the client in the Rpc RPC to request
+// that the target invoke a YANG RPC/action as defined in the schema.
+message RpcRequest {
+ Path path = 1; // The path to the RPC to invoke.
+ // The explicitly typed input value. Only JSON_IETF is supported currently.
+ TypedValue val = 2;
+ // The encoding that the target should use for the response.
+ Encoding encoding = 3;
+ // Timeout to use on the server side call in seconds
+ uint32 timeout = 4;
+}
+// RpcResponse is used by the target to respond to an Rpc RPC sent by the
+// Client.
+message RpcResponse {
+ int64 timestamp = 1; // Timestamp in nanoseconds since Epoch.
+ // The explicitly typed output value. Only JSON_IETF is supported currently.
+ TypedValue val = 2;
+}
diff --git a/scripts/gnxi-server-logrotate b/scripts/gnxi-server-logrotate
new file mode 100755
index 0000000..d75fffd
--- /dev/null
+++ b/scripts/gnxi-server-logrotate
@@ -0,0 +1,63 @@
+#!/bin/bash -e
+#
+# Copyright 2025 Graphiant Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+LOCKFILE="/tmp/gnmi-logrotate.lock"
+MAX_LOG_BYTES=$((2 * 1024 * 1024))
+MAX_ARCHIVES_BYTES=$((8 * 1024 * 1024))
+
+if [ $# -ne 1 ]; then
+ echo "Usage: $0 "
+ exit 1
+fi
+
+GNXI_LOGDIR="$1"
+test -d "$GNXI_LOGDIR" || ( echo "GNXI_LOGDIR $GNXI_LOGDIR does not exist" && exit 1 )
+
+dotlockfile -p -r 0 -l "$LOCKFILE"
+
+# inspect raw log size
+cd "$GNXI_LOGDIR/raw"
+
+# skip temporary files in the calculation
+# and can cause unnecessary stderr messages like cannot access file
+size=$(du -sb --exclude='*.tmp' --exclude='*.live' | cut -f1)
+test "$size" -gt "$MAX_LOG_BYTES" || exit 0 # Only archive if taking up more than 2MB collectively
+
+# check and rename any abandoned .live files to .log
+find . -name '*.live' -exec flock -o -n "{}" mv "{}" "{}.log" \;
+
+# archive all log files
+archive_name="$GNXI_LOGDIR/archives/archive.$(date +%F.%Hh%Mm%Ss.%Nns).tar.zst"
+find . -name '*.log' -exec tar --zstd -cf "$archive_name" {} + -exec rm -f {} +
+
+# inspect archive size
+cd "$GNXI_LOGDIR/archives"
+used_bytes=$(du -sb --exclude='*.tmp' | cut -f1)
+
+test "$used_bytes" -gt "$MAX_ARCHIVES_BYTES" || exit 0 # Only purge if taking up excess space
+
+must_delete=$((used_bytes - MAX_ARCHIVES_BYTES))
+deleted_bytes=0
+
+for f in $(find . -name '*.tar.zst' | sort -n); do
+ sz=$(stat -c %s "$f")
+ rm -f "$f"
+ deleted_bytes=$((sz + deleted_bytes))
+ test "$deleted_bytes" -lt "$must_delete" || break
+done
+
+dotlockfile -u "$LOCKFILE"
diff --git a/scripts/install-grpc.sh b/scripts/install-grpc.sh
deleted file mode 100755
index f2eee98..0000000
--- a/scripts/install-grpc.sh
+++ /dev/null
@@ -1,22 +0,0 @@
-#!/bin/bash
-
-BR="/tmp"
-VER="v1.18.0"
-
-##########
-# GRPC++ #
-##########
-
-mkdir -p ${BR}/downloads/ && cd ${BR}/downloads/
-
-git clone --depth=1 -b ${VER} https://github.com/grpc/grpc
-cd grpc && git submodule update --init
-
-#install protobuf
-cd third_party/protobuf
-./autogen.sh && ./configure && make install
-
-ldconfig
-
-#install grpc
-cd ../.. && make && make install
diff --git a/scripts/install-libyang.sh b/scripts/install-libyang.sh
deleted file mode 100755
index 7bdbe25..0000000
--- a/scripts/install-libyang.sh
+++ /dev/null
@@ -1,23 +0,0 @@
-#!/bin/bash
-
-BR="/tmp"
-
-###########
-# LIBYANG #
-###########
-
-mkdir -p ${BR}/downloads/ && cd ${BR}/downloads/
-
-# We need commit bf1aa13ba2dfb7b5938ed2345a67de316fc34917
-git clone https://github.com/CESNET/libyang/
-cd libyang
-
-mkdir -p build && cd build
-
-cmake -DCMAKE_BUILD_TYPE:String="Release" -DCMAKE_INSTALL_PREFIX:PATH=/usr \
--DGEN_LANGUAGE_BINDINGS=ON -DGEN_CPP_BINDINGS=ON \
--DGEN_PYTHON_BINDINGS=OFF -DBUILD_EXAMPLES=OFF \
--DENABLE_BUILD_TESTS=OFF ..
-
-make -j$(nproc)
-make install
diff --git a/scripts/install-sysrepo.sh b/scripts/install-sysrepo.sh
deleted file mode 100755
index 2520b3a..0000000
--- a/scripts/install-sysrepo.sh
+++ /dev/null
@@ -1,29 +0,0 @@
-#!/bin/bash
-
-BR="/tmp"
-
-###########
-# SYSREPO #
-###########
-
-mkdir -p ${BR}/downloads/ && cd ${BR}/downloads/
-
-wget https://github.com/sysrepo/sysrepo/archive/v0.7.7.tar.gz
-tar xvf v0.7.7.tar.gz && cd sysrepo-0.7.7
-
-mkdir -p build && cd build
-
-#Without NACM
-cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX:PATH=/usr \
--DGEN_LANGUAGE_BINDINGS=ON -DGEN_CPP_BINDINGS=ON -DGEN_LUA_BINDINGS=OFF \
--DGEN_PYTHON_BINDINGS=OFF -DGEN_JAVA_BINDINGS=OFF -DBUILD_EXAMPLES=OFF \
--DENABLE_TESTS=OFF ..
-
-#With NACM
-#cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX:PATH=/usr \
-#-DGEN_LANGUAGE_BINDINGS=ON -DGEN_CPP_BINDINGS=ON -DGEN_LUA_BINDINGS=OFF \
-#-DGEN_PYTHON_BINDINGS=OFF -DGEN_JAVA_BINDINGS=OFF -DBUILD_EXAMPLES=OFF \
-#-DENABLE_TESTS=OFF -DENABLE_NACM=ON ..
-
-make
-make install
diff --git a/src/gnmi/capabilities.cpp b/src/gnmi/capabilities.cpp
index e825b02..bd83d04 100644
--- a/src/gnmi/capabilities.cpp
+++ b/src/gnmi/capabilities.cpp
@@ -1,5 +1,6 @@
/*
* Copyright 2020 Yohan Pipereau
+ * Copyright 2025 Graphiant Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,7 +20,6 @@
using namespace gnmi;
using namespace std;
-using sysrepo::Yang_Schemas;
using google::protobuf::FileOptions;
Status GNMIService::Capabilities(ServerContext *context,
@@ -27,7 +27,6 @@ Status GNMIService::Capabilities(ServerContext *context,
CapabilityResponse* response)
{
(void)context;
- shared_ptr schemas;
string gnmi_version;
FileOptions fopts;
@@ -37,12 +36,21 @@ Status GNMIService::Capabilities(ServerContext *context,
}
try {
- schemas = sr_sess->list_schemas();
-
- for (unsigned int i = 0; i < schemas->schema_cnt(); i++) {
- auto model = response->add_supported_models();
- model->set_name(schemas->schema(i)->module_name());
- model->set_version(schemas->schema(i)->revision()->revision());
+ auto sess = sr_con.sessionStart();
+ auto node = sess.getModuleInfo();
+ for (auto mod_node : node.child()->siblings()) {
+ auto model = response->add_supported_models();
+ for (auto mod_value_node = mod_node.child(); mod_value_node.has_value();
+ mod_value_node = mod_value_node.value().nextSibling()) {
+ if (!mod_value_node->schema().name().compare("name")) {
+ auto name = std::string(mod_value_node->asTerm().valueStr());
+ model->set_name(name);
+ }
+ if (!mod_value_node->schema().name().compare("revision")) {
+ auto version = std::string(mod_value_node->asTerm().valueStr());
+ model->set_version(version);
+ }
+ }
}
gnmi_version = response->GetDescriptor()->file()->options()
diff --git a/src/gnmi/confirm.cpp b/src/gnmi/confirm.cpp
new file mode 100644
index 0000000..c0f00a8
--- /dev/null
+++ b/src/gnmi/confirm.cpp
@@ -0,0 +1,275 @@
+/*
+ * Copyright 2025 Graphiant Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+#include
+
+#include
+#include
+#include
+
+#include "confirm.h"
+#include
+using std::string;
+#include
+#include
+#include
+
+using namespace std;
+using google::protobuf::RepeatedPtrField;
+
+namespace impl {
+
+// Large timeout value to be used when there's nothing to timeout
+#define LARGE_TIMEOUT_SECS (7 * 24 * 60 * 60)
+
+// Implements gNMI Confirm RPC
+Status Confirm::run(const ConfirmRequest *request, ConfirmResponse *response) {
+ (void)request; // unused
+ (void)response; // unused
+ Status status;
+
+ if (not conf_state_->get_wait_confirm()) {
+ // We are not expecting for a Confirm RPC
+ std::string err_str = "Not expecting Confirm RPC";
+ return Status(StatusCode::FAILED_PRECONDITION, err_str);
+ }
+ // Now make sure enough time has elapsed
+ uint64_t earliest_confirm_time_nsecs =
+ conf_state_->get_earliest_confirm_time_nsecs();
+ uint64_t crnt_time_ns = get_time_nanosec();
+ if (crnt_time_ns < earliest_confirm_time_nsecs) {
+ std::string err_str =
+ "Confirm RPC too soon by " +
+ std::to_string(earliest_confirm_time_nsecs - crnt_time_ns) + " nsecs";
+ return Status(StatusCode::UNAVAILABLE, err_str);
+ }
+ BOOST_LOG_TRIVIAL(debug) << "Ignore-system-state: " << request->ignore_system_state();
+ if (not request->ignore_system_state()) {
+ // We have to check system state
+ }
+
+ try {
+ sr_sess_startup_.copyConfig(sysrepo::Datastore::Running);
+ } catch (sysrepo::ErrorWithCode &e) {
+ BOOST_LOG_TRIVIAL(error) << "Copy from running config to startup config failed: "
+ << e.what()
+ << ". Transaction-id:"
+ << conf_state_->read_set_transaction_id();
+ return Status(StatusCode::ABORTED, e.what());
+ }
+
+ // The last succesful set transaction has been confirmed
+ conf_state_->write_confirmed_transaction_id(
+ conf_state_->read_set_transaction_id());
+
+ // All good, clear state
+ conf_state_->clr_wait_confirm();
+ return Status::OK;
+}
+
+// Callback function for timer expiry
+static void check_confirm_expiry_cb(const boost::system::error_code &e, ConfirmState *conf_state) {
+ if (conf_state->get_wait_confirm() and
+ (e != boost::asio::error::operation_aborted)) {
+ // Restore config only if the wait was not stopped
+ conf_state->restore_config();
+ }
+}
+
+// Handles confirm timeout or failure by restoring config
+void ConfirmState::restore_config() {
+ std::unique_lock lock(mutex_);
+ BOOST_LOG_TRIVIAL(error) << "Restoring config: no valid Confirm RPC received";
+ if (cfg_snapshot_.has_value()) {
+ std::string cfg_snapshot_json =
+ cfg_snapshot_->printStr(libyang::DataFormat::JSON, libyang::PrintFlags::WithSiblings).value();
+ // Restore config
+ try {
+ sr_sess_.replaceConfig(std::nullopt, cfg_snapshot_.value());
+ // Restore transcation-id
+ write_set_transaction_id(read_confirmed_transaction_id());
+ } catch (const std::exception& e) {
+ // Yikes
+ BOOST_LOG_TRIVIAL(error) << e.what();
+ }
+ // Not waiting for confirm anymore
+ clr_wait_confirm_no_lock_();
+ } else {
+ BOOST_LOG_TRIVIAL(error) << "No config snapshot to restore";
+ }
+}
+// Loop to check confirm timeout
+void ConfirmState::check_confirm_loop(ConfirmState *conf_state) {
+ BOOST_LOG_TRIVIAL(debug) << "Commit confirm timer thread started";
+
+ while (not conf_state->timer_thread_exit_) {
+ boost::asio::deadline_timer timer(
+ conf_state->io_,
+ boost::posix_time::seconds(conf_state->get_timeout_secs()));
+
+ timer.async_wait(boost::bind(check_confirm_expiry_cb, boost::asio::placeholders::error, conf_state));
+ // This is blocking
+ conf_state->io_.run();
+ // Reset
+ conf_state->io_.reset();
+ }
+
+ BOOST_LOG_TRIVIAL(debug) << "Commit confirm timer thread exited";
+}
+
+ConfirmState* ConfirmState::singleton_ = nullptr;
+
+sysrepo::Session ConfirmState::createSession(sysrepo::Connection conn)
+{
+ try {
+ return conn.sessionStart();
+ } catch (const std::exception& exc) {
+ BOOST_LOG_TRIVIAL(error) << "Connection to sysrepo failed " << exc.what();
+ exit(1);
+ }
+ BOOST_LOG_TRIVIAL(debug) << "Commit confirm timer thread exited";
+}
+
+// Constructor for ConfirmState
+ConfirmState::ConfirmState(sysrepo::Connection conn) : sr_sess_(createSession(conn)) {
+
+ reset_default_timeout_secs();
+ reset_min_wait_conf_secs();
+ timeout_secs_ = LARGE_TIMEOUT_SECS;
+ wait_confirm_ = false;
+ timer_thread_exit_ = false;
+ timer_thread_ = std::thread(check_confirm_loop, this);
+ singleton_ = this;
+}
+
+ConfirmState::~ConfirmState() {
+ timer_thread_exit_ = true;
+ io_.stop();
+ timer_thread_.join();
+ singleton_ = nullptr;
+}
+
+bool ConfirmState::get_wait_confirm() {
+ std::shared_lock lock(mutex_);
+ return wait_confirm_;
+}
+
+// Takes config snapshot, resets timer etc
+bool ConfirmState::set_wait_confirm(uint32_t timeout_secs, std::string &err_msg) {
+ std::unique_lock lock(mutex_);
+ if (wait_confirm_) {
+ // Already waiting
+ err_msg = "Already waiting for Confirm RPC";
+ BOOST_LOG_TRIVIAL(error) << err_msg;
+ return false;
+ }
+
+ // Get snapshot of current config
+ cfg_snapshot_ = sr_sess_.getData("/*");
+
+ wait_confirm_ = true;
+
+ set_timeout_secs(timeout_secs);
+ reset_timers();
+
+ return true;
+}
+void ConfirmState::reset_timers() {
+ // Earliest time at which Confirm is accepted is now + min wait time
+ auto crnt_time = get_time_nanosec();
+ earliest_confirm_time_nsecs_ =
+ crnt_time + (static_cast(min_wait_conf_secs_) * 1000000000ull);
+
+ // Stop the confirm timer so that it gets restarted
+ io_.stop();
+}
+void ConfirmState::clr_wait_confirm() {
+ std::unique_lock lock(mutex_);
+ clr_wait_confirm_no_lock_();
+}
+// Useful when caller already has lock
+void ConfirmState::clr_wait_confirm_no_lock_() {
+ wait_confirm_ = false;
+ cfg_snapshot_ = std::nullopt;
+ timeout_secs_ = LARGE_TIMEOUT_SECS;
+ io_.stop();
+}
+
+uint32_t ConfirmState::get_timeout_secs() {
+ std::shared_lock lock(mutex_);
+
+ return (timeout_secs_);
+}
+uint32_t ConfirmState::get_min_wait_conf_secs() {
+ std::shared_lock lock(mutex_);
+
+ return (min_wait_conf_secs_);
+}
+void ConfirmState::set_default_timeout_secs(uint32_t value) {
+ std::unique_lock lock(mutex_);
+
+ default_timeout_secs_ = value;
+}
+void ConfirmState::reset_default_timeout_secs() {
+ std::unique_lock lock(mutex_);
+
+ default_timeout_secs_ = 300;
+}
+void ConfirmState::set_min_wait_conf_secs(uint32_t value) {
+ std::unique_lock lock(mutex_);
+
+ min_wait_conf_secs_ = value;
+}
+void ConfirmState::reset_min_wait_conf_secs() {
+ std::unique_lock lock(mutex_);
+
+ min_wait_conf_secs_ = 30;
+}
+void ConfirmState::set_timeout_secs(uint32_t timeout_secs) {
+ if ((timeout_secs == 0) or (timeout_secs < min_wait_conf_secs_)) {
+ // No value or too small value was provided, use default
+ timeout_secs_ = default_timeout_secs_;
+ } else {
+ timeout_secs_ = timeout_secs;
+ }
+}
+uint64_t ConfirmState::get_earliest_confirm_time_nsecs() {
+ std::unique_lock lock(mutex_);
+
+ return earliest_confirm_time_nsecs_;
+}
+uint32_t ConfirmState::get_num_events_service_failures() {
+ return num_events_service_failures_;
+}
+uint64_t ConfirmState::read_set_transaction_id() {
+ return set_transaction_id_;
+}
+void ConfirmState::write_set_transaction_id(uint64_t id) {
+ BOOST_LOG_TRIVIAL(debug) << "write_set_transaction_id():" << id;
+ set_transaction_id_ = id;
+}
+uint64_t ConfirmState::read_confirmed_transaction_id() {
+ return confirmed_transaction_id_;
+}
+void ConfirmState::write_confirmed_transaction_id(uint64_t id) {
+ BOOST_LOG_TRIVIAL(debug) << "write_confirmed_transaction_id():" << id;
+
+ confirmed_transaction_id_ = id;
+}
+
+
+} // namespace impl
diff --git a/src/gnmi/confirm.h b/src/gnmi/confirm.h
new file mode 100644
index 0000000..4847028
--- /dev/null
+++ b/src/gnmi/confirm.h
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2025 Graphiant Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _GNMI_CONFIRM_H
+#define _GNMI_CONFIRM_H
+
+#include
+#include
+#include
+#include
+#include
+
+using namespace gnmi;
+using google::protobuf::RepeatedPtrField;
+using grpc::Status;
+using grpc::StatusCode;
+using sysrepo::Connection;
+using sysrepo::Session;
+
+namespace impl {
+
+// Class to manage the "state machine" for Confirm behaviour
+class ConfirmState {
+public:
+ ConfirmState(sysrepo::Connection conn);
+ ~ConfirmState();
+
+ // Singleton for UT purposes only
+ static ConfirmState &get_singleton() {
+ return *singleton_;
+ }
+ // Returns true if we are currently waiting for a confirm
+ bool get_wait_confirm();
+ // Returns true on success, false on error (e.g. already waiting for a confirm)
+ bool set_wait_confirm(uint32_t timeout_secs, std::string &err_msg);
+ // Not waiting for confirm anymore
+ void clr_wait_confirm();
+ // Reset the timers for Confirm (min/max)
+ void reset_timers();
+ // Earliest time in ns, since epoch, to accept Confirm
+ // Relevant when wait_confirm_ is true
+ uint64_t get_earliest_confirm_time_nsecs();
+
+ uint32_t get_timeout_secs();
+ uint32_t get_min_wait_conf_secs();
+ // Gets the stored snapshot for this counter
+ uint32_t get_num_events_service_failures();
+ // Handles Confirm timeout or failure by restoring the config
+ void restore_config();
+ // Updates/gets the transaction ids
+ uint64_t read_set_transaction_id();
+ void write_set_transaction_id(uint64_t id);
+ uint64_t read_confirmed_transaction_id();
+ void write_confirmed_transaction_id(uint64_t id);
+ // Used for testing purposes
+ void set_default_timeout_secs(uint32_t value);
+ void reset_default_timeout_secs();
+ void set_min_wait_conf_secs(uint32_t value);
+ void reset_min_wait_conf_secs();
+ void set_timeout_secs(uint32_t value);
+private:
+ // Whether we are waiting for a confirm RPC
+ bool wait_confirm_;
+ // For locking
+ std::shared_timed_mutex mutex_;
+ // For the timer
+ std::thread timer_thread_;
+ boost::asio::io_service io_;
+ bool timer_thread_exit_;
+ // Default timeout for Confirm (used when none specified)
+ uint32_t default_timeout_secs_;
+ // Timeout for Confirm
+ uint32_t timeout_secs_;
+ // Minimum time to wait before accepting a confirm
+ uint32_t min_wait_conf_secs_;
+ // Earliest time in ns, since epoch, to accept Confirm
+ // Relevant when wait_confirm_ is true
+ uint64_t earliest_confirm_time_nsecs_;
+ // session to sysrepo
+ sysrepo::Session sr_sess_;
+ // Config snapshot
+ std::optional cfg_snapshot_;
+ // Snapshot of number of times services have failed
+ uint32_t num_events_service_failures_;
+
+ // On successful Set request, set_transaction_id is updated to request content
+ // On successful Confirm request, confirmed_transaction_id is updated to
+ // set_transaction_id.
+ // On failed Set request, no transaction-id is updated
+ // On Confirm timeout, when restoring config, set_transaction_id is reset
+ // to confirmed_transaction_id
+ uint64_t set_transaction_id_;
+ uint64_t confirmed_transaction_id_;
+
+private:
+ static void check_confirm_loop(ConfirmState *conf_state);
+ // Not waiting for confirm anymore
+ void clr_wait_confirm_no_lock_();
+ sysrepo::Session createSession(sysrepo::Connection conn);
+ static ConfirmState *singleton_;
+};
+
+class Confirm {
+public:
+ Confirm(sysrepo::Session startup_sess, std::shared_ptr conf_state)
+ : sr_sess_startup_(startup_sess), conf_state_(conf_state)
+ {}
+ ~Confirm() {}
+
+ Status run(const ConfirmRequest *req, ConfirmResponse *response);
+
+private:
+ sysrepo::Session sr_sess_startup_;
+ std::shared_ptr conf_state_;
+};
+
+} // namespace impl
+
+#endif //_GNMI_CONFIRM_H
diff --git a/src/gnmi/encode/encode.cpp b/src/gnmi/encode/encode.cpp
index bacf5de..581a159 100644
--- a/src/gnmi/encode/encode.cpp
+++ b/src/gnmi/encode/encode.cpp
@@ -1,5 +1,6 @@
/*
* Copyright 2020 Yohan Pipereau
+ * Copyright 2025 Graphiant Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,189 +15,167 @@
* limitations under the License.
*/
-#include
-#include
-
-#include
-
+#include
#include "encode.h"
+#include "utils/log.h"
+#include "utils/sysrepo.h"
+using namespace gnmi;
using namespace std;
-using namespace libyang;
-using sysrepo::Val;
+using namespace grpc;
+using Status = grpc::Status;
-/*
- * Wrapper to test wether the current Data Node is a key.
- * We know that by looking in the Schema Tree.
- * @param leaf Leaf Data Node
- */
-static bool isKey(S_Data_Node_Leaf_List leaf)
+std::tuple> Encode::decode(
+ string xpath, const gnmi::TypedValue &reqval, EncodePurpose purpose)
{
- S_Schema_Node_Leaf tmp = make_shared(leaf->schema());
-
- if (tmp->is_key())
- return true;
- else
- return false;
+ switch (reqval.value_case()) {
+ case gnmi::TypedValue::ValueCase::kStringVal:
+ return std::make_tuple(Status(StatusCode::UNIMPLEMENTED, "Unsupported protobuf string type"), std::nullopt);
+ case gnmi::TypedValue::ValueCase::kIntVal:
+ return std::make_tuple(Status(StatusCode::UNIMPLEMENTED, "Unsupported protobuf int type"), std::nullopt);
+ case gnmi::TypedValue::ValueCase::kUintVal:
+ return std::make_tuple(Status(StatusCode::UNIMPLEMENTED, "Unsupported protobuf uint type"), std::nullopt);
+ case gnmi::TypedValue::ValueCase::kBoolVal:
+ return std::make_tuple(Status(StatusCode::UNIMPLEMENTED, "Unsupported protobuf bool type"), std::nullopt);
+ case gnmi::TypedValue::ValueCase::kBytesVal:
+ return std::make_tuple(Status(StatusCode::UNIMPLEMENTED, "Unsupported protobuf bytes type"), std::nullopt);
+ case gnmi::TypedValue::ValueCase::kFloatVal:
+ return std::make_tuple(Status(StatusCode::UNIMPLEMENTED, "Unsupported protobuf float type"), std::nullopt);
+ case gnmi::TypedValue::ValueCase::kDecimalVal:
+ return std::make_tuple(Status(StatusCode::UNIMPLEMENTED, "Unsupported protobuf Decimal64 type"), std::nullopt);
+ case gnmi::TypedValue::ValueCase::kLeaflistVal:
+ return std::make_tuple(Status(StatusCode::UNIMPLEMENTED, "Unsupported protobuf leaflist type"), std::nullopt);
+ case gnmi::TypedValue::ValueCase::kAnyVal:
+ return std::make_tuple(Status(StatusCode::UNIMPLEMENTED, "Unsupported PROTOBUF Encoding"), std::nullopt);
+ case gnmi::TypedValue::ValueCase::kJsonVal:
+ return std::make_tuple(Status(StatusCode::UNIMPLEMENTED, "Unsupported JSON Encoding"), std::nullopt);
+ case gnmi::TypedValue::ValueCase::kJsonIetfVal:
+ try {
+ return std::make_tuple(Status::OK, json_decode(xpath, reqval.json_ietf_val(), purpose));
+ } catch (runtime_error &err) {
+ // wrong input field must reply an error to gnmi client
+ BOOST_LOG_TRIVIAL(error) << "Run-time error:" << err.what();
+ return std::make_tuple(Status(StatusCode::INVALID_ARGUMENT, err.what()), std::nullopt);
+ } catch (invalid_argument &err) {
+ BOOST_LOG_TRIVIAL(error) << "Invalid argument:" << err.what();
+ return std::make_tuple(Status(StatusCode::INVALID_ARGUMENT, err.what()), std::nullopt);
+ }
+ break;
+ case gnmi::TypedValue::ValueCase::kAsciiVal:
+ return std::make_tuple(Status(StatusCode::UNIMPLEMENTED, "Unsupported ASCII Encoding"), std::nullopt);
+ case gnmi::TypedValue::ValueCase::kProtoBytes:
+ return std::make_tuple(Status(StatusCode::UNIMPLEMENTED, "Unsupported PROTOBUF BYTE Encoding"), std::nullopt);
+ case gnmi::TypedValue::ValueCase::VALUE_NOT_SET:
+ return std::make_tuple(Status(StatusCode::INVALID_ARGUMENT, "Value not set"), std::nullopt);
+ default:
+ return std::make_tuple(Status(StatusCode::INVALID_ARGUMENT, "Unknown value type"), std::nullopt);
+ }
}
-/*
- * Store YANG leaf in sysrepo datastore
- * @param node Describe a libyang Data Tree leaf or leaf list
- */
-void Encode::storeLeaf(libyang::S_Data_Node_Leaf_List node)
+std::tuple> Encode::update(string xpath, const TypedValue &reqval, string op)
{
- shared_ptr sval;
-
- if (isKey(node)) {
- /* If node is a key create it first by setting parent path */
- BOOST_LOG_TRIVIAL(debug) << "leaf key: " << node->path();
- return;
- } else {
- BOOST_LOG_TRIVIAL(debug) << "leaf: " << node->path();
- }
-
- switch(node->value_type()) {
- case LY_TYPE_BINARY: /* Any binary data */
- sval = make_shared(node->value()->binary(), SR_STRING_T);
- break;
- case LY_TYPE_STRING: /* Human-readable string */
- sval = make_shared(node->value()->string(), SR_STRING_T);
- break;
- case LY_TYPE_BOOL: /* "true" or "false" */
- sval = make_shared(static_cast(node->value()->bln()));
- break;
- case LY_TYPE_DEC64: /* 64-bit signed decimal number */
- sval = make_shared(static_cast(node->value()->dec64()));
- break;
- case LY_TYPE_INT8: /* 8-bit signed integer */
- sval = make_shared(node->value()->int8(), SR_INT8_T);
- //sval = make_shared(node->value()->int8());
- break;
- case LY_TYPE_UINT8: /* 8-bit unsigned integer */
- sval = make_shared(node->value()->uint8(), SR_UINT8_T);
- //sval = make_shared(node->value()->uint8());
- break;
- case LY_TYPE_INT16: /* 16-bit signed integer */
- sval = make_shared(node->value()->int16(), SR_INT16_T);
- //sval = make_shared(node->value()->int16());
- break;
- case LY_TYPE_UINT16: /* 16-bit unsigned integer */
- sval = make_shared(node->value()->uint16(), SR_UINT16_T);
- //sval = make_shared(node->value()->uint16());
- break;
- case LY_TYPE_INT32: /* 32-bit signed integer */
- sval = make_shared(node->value()->int32(), SR_INT32_T);
- //sval = make_shared(node->value()->int32());
- break;
- case LY_TYPE_UINT32: /* 32-bit unsigned integer */
- sval = make_shared(node->value()->uintu32(), SR_UINT32_T);
- //sval = make_shared(node->value()->uintu32());
- break;
- case LY_TYPE_INT64: /* 64-bit signed integer */
- sval = make_shared(node->value()->int64(), SR_INT64_T);
- break;
- case LY_TYPE_UINT64: /* 64-bit unsigned integer */
- sval = make_shared(node->value()->uint64(), SR_UINT64_T);
- //sval = make_shared(node->value()->uint64());
- break;
- case LY_TYPE_IDENT: /* A reference to an abstract identity */
- {
- string str(node->value()->ident()->module()->name());
- str.append(":");
- str.append(node->value()->ident()->name());
- sval = make_shared(str.c_str(), SR_IDENTITYREF_T);
- break;
+ UpdateTransaction xact;
+
+ if (xpath.compare("/*") != 0 && op.compare("replace") == 0) {
+ // Check if the xpath we are replacing is a leaf-list or a list
+ auto node_type = sr_sess.getContext().findPath(xpath).nodeType();
+ if (node_type == libyang::NodeType::Leaflist || node_type == libyang::NodeType::List) {
+ // Replacing list or leaflist means we should delete all previous entries
+ auto created_nodes = sr_sess.getContext().newPath2(xpath, std::nullopt, libyang::CreationOptions::Opaque);
+ auto del_parent = created_nodes.createdParent.value();
+ auto del_node = created_nodes.createdNode.value();
+ if (del_node.isOpaque()) {
+ del_node.newAttrOpaqueJSON("sysrepo", "operation", "purge");
+ } else {
+ auto sr_mod = sr_sess.getContext().getModuleImplemented("sysrepo").value();
+ // libyang treats NULL as a valid value for some data types
+ del_node.newMeta(sr_mod, "sysrepo:operation", "purge");
+ }
+ xact.push(del_parent);
}
- case LY_TYPE_ENUM: /* Enumerated strings */
- sval = make_shared(node->value()->enm()->name(), SR_ENUM_T);
- break;
- case LY_TYPE_EMPTY: /* A leaf that does not have any value */
- sval = make_shared(nullptr, SR_LEAF_EMPTY_T);
- break;
- case LY_TYPE_LEAFREF: /* A reference to a leaf instance */
- {
- //run again this function
- S_Data_Node_Leaf_List leaf
- = make_shared(node->value()->leafref());
- storeLeaf(leaf);
- break;
- }
-
-/* Unsupported types */
- case LY_TYPE_BITS: /* A set of bits or flags */
- BOOST_LOG_TRIVIAL(warning) << "Unsupported BITS type";
- throw std::invalid_argument("Unsupported BITS type");
- break;
- case LY_TYPE_INST: /* References a data tree node */
- BOOST_LOG_TRIVIAL(warning) << "Unsupported INSTANCE-IDENTIFIER type" << endl;
- throw std::invalid_argument("Unsupported INSTANCE-IDENTIFIER type");
- break;
- case LY_TYPE_UNION: /* Choice of member types */
- BOOST_LOG_TRIVIAL(warning) << "Unsupported UNION type";
- throw std::invalid_argument("Unsupported UNION type");
- break;
- case LY_TYPE_DER: /* Derived type */
- BOOST_LOG_TRIVIAL(warning) << "Unsupported DERIVED type";
- throw std::invalid_argument("Unsupported DERIVED type");
- break;
- case LY_TYPE_UNKNOWN: /* Unknown type (used in edit-config leaves) */
- BOOST_LOG_TRIVIAL(warning) << "Unsupported UNKNOWN type";
- throw std::invalid_argument("Unsupported UNKNOWN type");
- break;
- default:
- BOOST_LOG_TRIVIAL(warning) << "UNKNOWN type";
- throw std::invalid_argument("Unknown type");
- }
-
- try {
- sr_sess->set_item(node->path().c_str(), sval);
- } catch (exception &exc) {
- BOOST_LOG_TRIVIAL(warning) << exc.what();
- throw; //rethrow as caught
}
-}
-void Encode::storeTree(libyang::S_Data_Node node)
-{
- for (auto it : node->tree_dfs()) {
- /* Run through the entire tree, including siblinigs */
-
- switch(it->schema()->nodetype()) {
- case LYS_LEAF: //Only LEAF & LEAF LIST hold values in sysrepo
- {
- S_Data_Node_Leaf_List itleaf = make_shared(it);
-
- try {
- storeLeaf(itleaf);
- } catch (std::string str) { //triggered by sysepo::Val constructor
- BOOST_LOG_TRIVIAL(error) << str;
- throw invalid_argument("Internal error with JSON encoding");
- }
- break;
+ auto [status, node] = decode(xpath, reqval, EncodePurpose::Set);
+ if (!status.ok())
+ return std::make_tuple(status, std::nullopt);
+
+ auto root_node = node;
+ auto edit_node = node;
+
+ auto ietf_nc_mod = sr_sess.getContext().getModuleImplemented("ietf-netconf").value();
+ if (xpath.compare("/*") == 0) {
+ if (op.compare("replace") == 0) {
+ // The gNMI semantics are that a replace at the top-level should cause all data node not provided to be removed.
+ // However, sysrepo semantics are that only the provided nodes are replaced. Therefore, request that everything
+ // not being replaced is deleted.
+
+ auto del_root = sr_sess.getData(xpath.c_str(), 1);
+ // Walk all siblings not in update and add delete node to them
+ for (auto n = std::optional(del_root); n.has_value(); n = n->nextSibling()) {
+ if (getRawNode(*n)->flags & LYD_DEFAULT) {
+ // Default nodes need not be deleted and can be skipped
+ continue;
}
- case LYS_LEAFLIST: //Only LEAF & LEAF LIST hold values in sysrepo
- BOOST_LOG_TRIVIAL(warning) << "Unsupported leaf-list: " << it->path();
- //sysrepo does not seem to support leaf lists
- break;
-
- case LYS_LIST: //A list instance must be created before populating leaves
- {
- try {
- shared_ptr sval = make_shared(nullptr, SR_LIST_T);
- sr_sess->set_item(it->path().c_str(), sval);
- } catch (exception &exc) {
- BOOST_LOG_TRIVIAL(warning) << exc.what();
- throw; //rethrow as caught
+ bool is_replace_node = false;
+ // Is this node a replace node?
+ for (auto repl_n = edit_node; repl_n.has_value(); repl_n = repl_n->nextSibling()) {
+ if (n->schema().path() == repl_n->schema().path()) {
+ is_replace_node = true;
+ break;
}
+ }
- break;
+ // If this is a replace node, then optimise further sysrepo processing by not adding it to the batch
+ if (!is_replace_node) {
+ n->newMeta(ietf_nc_mod, "ietf-netconf:operation", "remove");
+ xact.push_one(*n);
}
+ }
+ }
+
+ // Add operation attribute to each node - there can be multiple if the JSON contains multiple top-level nodes.
+ for (auto n = edit_node; n.has_value(); n = n->nextSibling()) {
+ n->newMeta(ietf_nc_mod, "ietf-netconf:operation", op);
+ }
+ if (edit_node.has_value()) {
+ xact.push(edit_node.value());
+ }
+ root_node = xact.first_node;
+
+ } else {
+ // Find the edit point for the data fragment
+ auto set = root_node->findXPath(xpath.c_str());
+ // We should have found a path, and wildcards don't make sense
+ if (set.empty()) {
+ BOOST_LOG_TRIVIAL(error) << "Empty result searching for "
+ << xpath.c_str();
+ throw invalid_argument("invalid set returned for xpath \"" + xpath + "\"");
+ }
- default:
- break;
+ for (auto edit_node : set) {
+ edit_node.newMeta(ietf_nc_mod, "ietf-netconf:operation", op);
+ BOOST_LOG_TRIVIAL(debug) << op.c_str() << " path: " << edit_node.path();
}
+ xact.push(root_node.value());
+ root_node = xact.first_node;
}
+
+ return std::make_tuple(Status::OK, root_node);
}
+grpc::Status Encode::encode(Encoding encoding, libyang::DataNode node, TypedValue *val)
+{
+ switch (encoding) {
+ case gnmi::JSON:
+ case gnmi::JSON_IETF:
+ val->set_json_ietf_val(json_encode(node));
+ break;
+ default:
+ BOOST_LOG_TRIVIAL(warning) << "Unsupported Encoding "
+ << Encoding_Name(encoding);
+ return Status(StatusCode::UNIMPLEMENTED, Encoding_Name(encoding));
+ }
+
+ return Status::OK;
+}
diff --git a/src/gnmi/encode/encode.h b/src/gnmi/encode/encode.h
index 76e5a8a..d46b448 100644
--- a/src/gnmi/encode/encode.h
+++ b/src/gnmi/encode/encode.h
@@ -1,5 +1,6 @@
/*
* Copyright 2020 Yohan Pipereau
+ * Copyright 2025 Graphiant Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,10 +18,9 @@
#ifndef _ENCODE_H
#define _ENCODE_H
-#include
+#include
#include
-
-#include
+#include
using std::shared_ptr;
using std::string;
@@ -32,19 +32,37 @@ using std::vector;
* It provides YANG validation before storing elements and after fetching them
* in sysrepo.
*
- * -update() CREATE & UPDATE
- * -read() READ
+ * update() CREATE & UPDATE
+ * read() READ
*
* DELETE is not supported as it is not dependent of encodings.
- * Use sr_delete_item to suppress subtree from a xpath directly.
+ * Use sr_deleteItem to suppress subtree from a xpath directly.
*/
-struct JsonData {
- JsonData() {}
- /* Field containing a YANG list key [name=value] */
- std::pair key;
- /* Field containing the JSON tree under the designed YANG element */
- string data;
+/* helper class to reset session datastore on going out of scope */
+class SessionDsSwitcher {
+ public:
+ SessionDsSwitcher(sysrepo::Session sess, sysrepo::Datastore ds)
+ : sr_sess(sess)
+ {
+ orig_ds = sr_sess.activeDatastore();
+ sr_sess.switchDatastore(ds);
+ }
+ ~SessionDsSwitcher()
+ {
+ sr_sess.switchDatastore(orig_ds);
+ }
+ private:
+ sysrepo::Session sr_sess;
+ sysrepo::Datastore orig_ds;
+};
+
+/*
+ * Purpose for the encode/decode
+ */
+enum class EncodePurpose {
+ Set,
+ Rpc,
};
/*
@@ -53,26 +71,32 @@ struct JsonData {
*/
class Encode {
public:
- Encode(std::shared_ptr sr_sess);
- ~Encode();
+ Encode(sysrepo::Session sess)
+ : sr_sess(sess)
+ {
+ }
+
+ void set_log_id(uint64_t id) {
+ log_id = id;
+ sr_session_set_nc_id(sysrepo::getRawSession(sr_sess), id);
+ }
/* Supported Encodings */
enum Supported {
JSON_IETF = 0,
};
- /* JSON encoding */
- void json_update(string data);
- vector json_read(string xpath);
+ std::tuple> decode(string xpath, const gnmi::TypedValue &reqval, EncodePurpose purpose);
+ std::tuple> update(string xpath, const gnmi::TypedValue &reqval, string op);
+ grpc::Status encode(gnmi::Encoding encoding, libyang::DataNode node, gnmi::TypedValue *val);
- private:
- void storeTree(libyang::S_Data_Node node);
- void storeLeaf(libyang::S_Data_Node_Leaf_List node);
+ /* JSON encoding */
+ std::optional json_decode(string xpath, string data, EncodePurpose purpose);
+ string json_encode(libyang::DataNode node);
private:
- std::shared_ptr ctx;
- std::shared_ptr sr_sess;
- sysrepo::S_Subscribe sub; //must be out of constructor to recv callback
+ sysrepo::Session sr_sess;
+ uint64_t log_id = 0;
};
#endif //_ENCODE_H
diff --git a/src/gnmi/encode/json_ietf.cpp b/src/gnmi/encode/json_ietf.cpp
index 01a568c..caf480a 100644
--- a/src/gnmi/encode/json_ietf.cpp
+++ b/src/gnmi/encode/json_ietf.cpp
@@ -1,5 +1,6 @@
/*
* Copyright 2020 Yohan Pipereau
+ * Copyright 2025 Graphiant Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,12 +15,12 @@
* limitations under the License.
*/
-#include
-#include
-#include
-#include
+#include
+#include
#include
+#include
+#include
#include "encode.h"
@@ -31,156 +32,132 @@ using namespace libyang;
* CRUD - UPDATE *
*****************/
+std::string stripQuotes(const std::string& str) {
+ if (str.front() == '\"' && str.back() == '\"') {
+ return str.substr(1, str.length() - 2);
+ }
+ return str;
+}
+
/*
* Parse a message encoded in JSON IETF and set fields in sysrepo.
* @param data Input data encoded in JSON
*/
-void Encode::json_update(string data)
+std::optional Encode::json_decode(string xpath, string data, EncodePurpose purpose)
{
- S_Data_Node node;
-
- /* Parse input JSON, same options than netopeer2 edit-config */
- node = ctx->parse_data_mem(data.c_str(), LYD_JSON, LYD_OPT_EDIT |
- LYD_OPT_STRICT);
+ // Request to fail if the data doesn't match the schema
+ auto metadata = "XPath: " + xpath + ". InputData";
+ log_to_file(data, metadata, log_id);
+
+ if (xpath.compare("/*") == 0) {
+ try {
+ auto ctx = sr_sess.getContext();
+ return ctx.parseData(data, DataFormat::JSON, ParseOptions::ParseOnly | ParseOptions::Strict, std::nullopt);
+ } catch (const exception &exc) {
+ BOOST_LOG_TRIVIAL(error) << "Failed to parse data:" << obfs_data(data)
+ << ". Exception: " << exc.what();
+ // Don't leave the error lying around on the context otherwise sysrepo may pick it up on an unrelated operation
+ auto ctx = sr_sess.getContext();
+ const_cast(&ctx)->cleanAllErrors();
+ throw invalid_argument(exc.what());
+ }
+ }
- /* store Data Tree to sysrepo */
- storeTree(node);
-}
+ std::optional root_node;
-/***************
- * CRUD - READ *
- ***************/
+ // Create a node tree according to the xpath. The data is passed in because libyang makes this mandatory for leaf
+ // nodes - it will be ignored for other node types (we cannot easily know what the node type is ahead of time).
+ try {
+ data = stripQuotes(data);
+ auto schema_node = sr_sess.getContext().findPath(xpath);
+ auto node_type = schema_node.nodeType();
+ auto opts = CreationOptions::Update;
-static Json::Value json_tree(sysrepo::S_Tree tree)
-{
- sysrepo::S_Tree iter;
- Json::Value val;
-
- // run through all siblings
- for (iter = tree->first_child(); iter != nullptr; iter = iter->next()) {
- //create sibling with "node" as a parent
- switch (iter->type()) { //follows RFC 7951
- /* JSON Number */
- case SR_UINT8_T:
- val[iter->name()] = iter->data()->get_uint8();
- break;
- case SR_UINT16_T:
- val[iter->name()] = iter->data()->get_uint16();
- break;
- case SR_UINT32_T:
- val[iter->name()] = iter->data()->get_uint32();
- break;
- case SR_INT8_T:
- val[iter->name()] = iter->data()->get_int8();
- break;
- case SR_INT16_T:
- val[iter->name()] = iter->data()->get_int16();
- break;
- case SR_INT32_T:
- val[iter->name()] = iter->data()->get_int32();
- break;
+ if (node_type == libyang::NodeType::Leaflist) {
+ opts = opts | CreationOptions::IgnoreInvalidValue;
+ }
- /* JSON string */
- case SR_STRING_T:
- val[iter->name()] = iter->data()->get_string();
- break;
- case SR_INT64_T:
- val[iter->name()] = to_string(iter->data()->get_int64());
- break;
- case SR_UINT64_T:
- val[iter->name()] = to_string(iter->data()->get_uint64());
- break;
- case SR_DECIMAL64_T:
- val[iter->name()] = to_string(iter->data()->get_decimal64());
- break;
- case SR_IDENTITYREF_T:
- val[iter->name()] = iter->data()->get_identityref();
- break;
- case SR_INSTANCEID_T:
- val[iter->name()] = iter->data()->get_identityref();
- break;
- case SR_BINARY_T:
- val[iter->name()] = iter->data()->get_binary();
- break;
- case SR_BITS_T:
- val[iter->name()] = iter->data()->get_bits();
- break;
- case SR_ENUM_T:
- val[iter->name()] = iter->data()->get_enum();
- break;
- case SR_BOOL_T:
- val[iter->name()] = iter->data()->get_bool() ? "true" : "false";
- break;
+ if (node_type == libyang::NodeType::Leaf) {
+ // For empty leaf, libyang expects "" and not "[null]"
+ auto base_type = schema_node.asLeaf().valueType().base();
+ if (base_type == libyang::LeafBaseType::Empty && data == "[null]") {
+ data = "";
+ }
+ } else {
+ // Only non-Leaf nodes can be opaque
+ opts = opts | CreationOptions::Opaque;
+ }
- /* JSON arrays */
- case SR_LIST_T:
- val[iter->name()].append(json_tree(iter));
- break;
- case SR_LEAF_EMPTY_T:
- val[iter->name()].append("null");
- break;
+ auto created_nodes = sr_sess.getContext().newPath2(xpath, data, opts);
+ root_node = created_nodes.createdParent;
+ if (created_nodes.createdNode->schema().nodeType() == NodeType::Leaf) {
+ /* If it is a leaf node, we are done here */
+ return root_node;
+ }
+ } catch (const exception &exc) {
+ BOOST_LOG_TRIVIAL(error) << "Failed to create node:" << xpath.c_str()
+ << "Exception: " << exc.what();
+ // Don't leave the error lying around on the context otherwise sysrepo may pick it up on an unrelated operation
+ auto ctx = sr_sess.getContext();
+ const_cast(&ctx)->cleanAllErrors();
+ throw;
+ }
- /* nested JSON */
- case SR_CONTAINER_T:
- case SR_CONTAINER_PRESENCE_T:
- val[iter->name()] = json_tree(iter);
- break;
+ // Now find the edit point for the data fragment
+ auto set = root_node->findXPath(xpath);
+ // We should have found a path, and wildcards don't make sense
+ if (set.size() != 1)
+ throw invalid_argument("invalid set returned for xpath \"" + xpath + "\"");
- /* Unsupported types */
- case SR_ANYDATA_T:
- case SR_ANYXML_T:
- throw invalid_argument("unsupported ANYDATA and ANYXML types");
- break;
+ auto edit_node = set.front();
- default:
- BOOST_LOG_TRIVIAL(error) << "Unknown tree node type";
- throw invalid_argument("Unknown tree node type");
- }
+ try {
+ if (purpose == EncodePurpose::Rpc) {
+ edit_node.parseOp(data.c_str(), DataFormat::JSON, OperationType::RpcYang);
+ } else {
+ // Parse input JSON, expecting a fragment
+ edit_node.parseData(data.c_str(), DataFormat::JSON, ParseOptions::ParseOnly | ParseOptions::Strict | ParseOptions::BareTopLeaf);
+ }
+ } catch (const exception &exc) {
+ // Don't leave the error lying around on the context otherwise sysrepo may pick it up on an unrelated operation
+ auto ctx = sr_sess.getContext();
+ const_cast(&ctx)->cleanAllErrors();
+ BOOST_LOG_TRIVIAL(error) << "Failed to parse data. xpath: " << xpath
+ << ", data:" << obfs_data(data)
+ << ". Exception: " << exc.what();
+ throw;
}
- return val;
+
+ return root_node;
}
-/* Get sysrepo subtree data corresponding to XPATH */
-vector Encode::json_read(string xpath)
+/***************
+ * CRUD - READ *
+ ***************/
+
+/* Encode a libyang data node into JSON form */
+string Encode::json_encode(libyang::DataNode node)
{
- sysrepo::S_Trees sr_trees;
- sysrepo::S_Tree sr_tree;
- vector json_vec;
- Json::StyledWriter styledwriter; //pretty JSON
- Json::FastWriter fastWriter; //unreadable JSON
- Json::Value val;
- JsonData tmp;
- string key_name, key_value;
-
- BOOST_LOG_TRIVIAL(debug) << "read and encode in json data for " << xpath;
-
- /* Get multiple subtree for YANG lists or one for other YANG types */
- sr_trees = sr_sess->get_subtrees(xpath.c_str());
- if (sr_trees == nullptr)
- throw invalid_argument("xpath not found");
-
- for (size_t i = 0; i < sr_trees->tree_cnt(); i++) {
- sr_tree = sr_trees->tree(i);
- val = json_tree(sr_tree);
-
- /*
- * Pass a pair containing key name and key value.
- * keys are always first element of children in sysrepo trees
- */
- if (sr_tree->type() == SR_LIST_T) {
- tmp.key.first = string(sr_tree->first_child()->name());
- tmp.key.second = val[tmp.key.first].asString();
- BOOST_LOG_TRIVIAL(debug) << tmp.key.first << ":" << tmp.key.second;
+ string data;
+
+ if (node.schema().nodeType() == NodeType::Leaf)
+ data = node.printStr(DataFormat::JSON, PrintFlags::BareTopLeaf).value();
+ else if (node.schema().nodeType() == NodeType::Leaflist)
+ // FIXME: this is not correct - we don't have the current value of the whole leaflist
+ // available and, in addition, libyang doesn't return plausible JSON for it with any
+ // combination of flags
+ data = "[]";
+ else {
+ // In case the node has no children
+ data = "{}";
+ // The xpath will have found the containing node, but we want to dump its children according to gNMI rules
+ if (node.child().has_value()) {
+ for (auto it : node.child()->childrenDfs()) {
+ data = it.printStr(DataFormat::JSON, PrintFlags::WithSiblings | PrintFlags::Shrink | PrintFlags::Fragment).value();
+ break;
+ }
}
-
- /* Print Pretty JSON message */
- BOOST_LOG_TRIVIAL(debug) << styledwriter.write(val);
-
- /* Fast unreadable JSON message */
- tmp.data = fastWriter.write(val);
-
- json_vec.push_back(tmp);
}
- return json_vec;
+ return data;
}
diff --git a/src/gnmi/encode/load_models.cpp b/src/gnmi/encode/load_models.cpp
deleted file mode 100644
index 78800b6..0000000
--- a/src/gnmi/encode/load_models.cpp
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Copyright 2020 Yohan Pipereau
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include
-#include
-
-#include
-
-#include "encode.h"
-#include "runtime.h"
-
-using namespace std;
-using namespace libyang;
-
-/*
- * @brief Fetch all modules implemented in sysrepo datastore
- */
-Encode::Encode(shared_ptr sess)
- : sr_sess(sess)
-{
- shared_ptr schemas; //sysrepo YANG schemas supported
- shared_ptr scb; //pointer to callback class
- sub = make_shared(sr_sess); //sysrepo subscriptions
- S_Module mod;
- string str;
-
- //Libyang log level should be ERROR only
- set_log_verbosity(LY_LLERR);
-
- /* 1. build libyang context */
- ctx = make_shared();
-
- /* Instantiate Callback class */
- scb = make_shared(ctx, sess);
-
- /* 2. get the list of schemas from sysrepo */
- try {
- schemas = sr_sess->list_schemas();
- } catch (const exception &exc) {
- BOOST_LOG_TRIVIAL(error) << exc.what();
- exit(1);
- }
-
- /* 3.1 Callback for missing modules */
- auto mod_c_cb = [this](const char *mod_name, const char *mod_rev,
- const char *, const char *) -> libyang::Context::mod_missing_cb_return {
- string str; S_Module mod;
-
- BOOST_LOG_TRIVIAL(debug) << "Importing missing dependency " << mod_name;
- str = this->sr_sess->get_schema(mod_name, mod_rev, NULL, SR_SCHEMA_YANG);
-
- try {
- mod = this->ctx->parse_module_mem(str.c_str(), LYS_IN_YANG);
- } catch (const exception &exc) {
- BOOST_LOG_TRIVIAL(warning) << exc.what();
- }
-
- return {LYS_IN_YANG, mod_name};
- };
-
- /* 3.2 register callback for missing YANG module */
- ctx->add_missing_module_callback(mod_c_cb);
-
- /* 4. Initialize our libyang context with modules and features
- * already loaded in sysrepo */
- for (unsigned int i = 0; i < schemas->schema_cnt(); i++) {
- string module_name = schemas->schema(i)->module_name();
- string revision = schemas->schema(i)->revision()->revision();
-
- mod = ctx->get_module(module_name.c_str(), revision.c_str());
- if (mod != nullptr) {
- BOOST_LOG_TRIVIAL(debug) << "Module was already loaded: "
- << module_name << "@" << revision;
- } else {
- BOOST_LOG_TRIVIAL(debug) << "Download & parse module: "
- << module_name << "@" << revision;
-
- /* 4.1 Download YANG model from sysrepo as in YANG format and parse it */
- try {
- str = sr_sess->get_schema(module_name.c_str(), revision.c_str(), NULL,
- SR_SCHEMA_YANG);
- mod = ctx->parse_module_mem(str.c_str(), LYS_IN_YANG);
- } catch (const exception &exc) {
- BOOST_LOG_TRIVIAL(warning) << exc.what();
- continue;
- }
- }
-
- /* 4.2 Load features loaded in sysrepo */
- for (size_t j = 0; j < schemas->schema(i)->enabled_feature_cnt(); j++) {
- string feature_name = schemas->schema(i)->enabled_features(j);
-
- BOOST_LOG_TRIVIAL(debug) << "Loading feature " << feature_name
- << " in module " << mod->name();
-
- mod->feature_enable(feature_name.c_str());
- }
- }
-
- /* 5. subscribe for notifications about new modules */
- sub->module_install_subscribe(scb, ctx.get(), sysrepo::SUBSCR_DEFAULT);
-
- /* 6. subscribe for changes of features state */
- sub->feature_enable_subscribe(scb);
-}
-
-Encode::~Encode()
-{
- BOOST_LOG_TRIVIAL(info) << "Disconnect sysrepo session and Libyang context";
-}
diff --git a/src/gnmi/encode/runtime.cpp b/src/gnmi/encode/runtime.cpp
deleted file mode 100644
index 8ea4ea9..0000000
--- a/src/gnmi/encode/runtime.cpp
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Copyright 2020 Yohan Pipereau
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "runtime.h"
-
-#include
-
-using namespace std;
-using namespace libyang;
-
-static void print_loaded_module(std::shared_ptr ctx)
-{
- cout << "=================================================="
- << endl;
- for (auto it : ctx->get_module_iter())
- cout << string(it->name()) << endl;
- cout << "=================================================="
- << endl;
-}
-
-/* install - download module and load it in our libyang context. */
-void RuntimeSrCallback::install(const char *module_name, const char *revision)
-{
- libyang::S_Module mod;
- string str;
-
- /* Is module already loaded with libyang? */
- mod = ctx->get_module(module_name, revision);
- if (mod != nullptr) {
- BOOST_LOG_TRIVIAL(debug) << "Module was already loaded: "
- << module_name << "@" << revision;
- return;
- }
-
- /* Download module from sysrepo */
- try {
- BOOST_LOG_TRIVIAL(debug) << "Download " << module_name << " from sysrepo";
- str = sr_sess->get_schema(module_name, revision, nullptr, SR_SCHEMA_YANG);
- } catch (const exception &exc) {
- BOOST_LOG_TRIVIAL(warning) << exc.what();
- return;
- }
-
- /* parse module */
- try {
- BOOST_LOG_TRIVIAL(debug) << "Parse " << module_name << " with libyang";
- mod = ctx->parse_module_mem(str.c_str(), LYS_IN_YANG);
- } catch (const exception &exc) {
- BOOST_LOG_TRIVIAL(warning) << exc.what();
- return;
- }
-}
-
-/* module_install - Actions performed after sysrepo install/uninstall module
- * event. */
-void
-RuntimeSrCallback::module_install(const char *module_name, const char *revision,
- sr_module_state_t state, void *private_ctx)
-{
- (void)private_ctx;
-
- if (ctx == nullptr) {
- BOOST_LOG_TRIVIAL(error) << "Context can not be null";
- return;
- }
-
- switch (state) {
- case SR_MS_UNINSTALLED:
- BOOST_LOG_TRIVIAL(warning) << "Impossible to remove a module at runtime";
- break;
-
- case SR_MS_IMPORTED:
- case SR_MS_IMPLEMENTED:
- BOOST_LOG_TRIVIAL(info) << "Install " << module_name;
- install(module_name, revision);
- print_loaded_module(ctx);
- break;
-
- default:
- BOOST_LOG_TRIVIAL(error) << "Unknown state";
- }
-}
-
-void
-RuntimeSrCallback::feature_enable(const char *module_name,
- const char *feature_name, bool enable,
- void *private_ctx)
-{
- (void)private_ctx; (void) enable;
- BOOST_LOG_TRIVIAL(warning) << "Impossible to enable/disable feature "
- << string(feature_name) << " of "
- << string(module_name)
- << " at runtime";
-}
-
diff --git a/src/gnmi/encode/runtime.h b/src/gnmi/encode/runtime.h
deleted file mode 100644
index de75327..0000000
--- a/src/gnmi/encode/runtime.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2020 Yohan Pipereau
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef _RUNTIME_H
-#define _RUNTIME_H
-
-#include
-#include
-
-/*
- * RuntimeSrCallback - Class defining callbacks to perform installation of
- * module, enablement of feature in sysrepo-gnxi libyang context.
- * It is triggered by sysrepo events like module installation, feature
- * enablement
- */
-class RuntimeSrCallback : public sysrepo::Callback {
- public:
- RuntimeSrCallback(std::shared_ptr context,
- std::shared_ptr sess)
- : ctx(context), sr_sess(sess) {}
-
- void module_install(const char *module_name, const char *revision,
- sr_module_state_t state, void *private_ctx) override;
-
- void feature_enable(const char *module_name, const char *feature_name,
- bool enable, void *private_ctx) override;
-
- private:
- void install(const char *module_name, const char *revision);
-
- private:
- std::shared_ptr ctx;
- std::shared_ptr sr_sess;
-};
-
-#endif //_RUNTIME_H
diff --git a/src/gnmi/get.cpp b/src/gnmi/get.cpp
index 66adac5..f8f6e75 100644
--- a/src/gnmi/get.cpp
+++ b/src/gnmi/get.cpp
@@ -1,5 +1,6 @@
/*
* Copyright 2020 Yohan Pipereau
+ * Copyright 2025 Graphiant Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,65 +21,41 @@
#include "encode/encode.h"
#include
#include
+#include
using namespace std;
using google::protobuf::RepeatedPtrField;
-using sysrepo::sysrepo_exception;
namespace impl {
Status
Get::BuildGetUpdate(RepeatedPtrField* updateList,
- const Path &path, string fullpath,
- gnmi::Encoding encoding)
+ string fullpath, gnmi::Encoding encoding)
{
- Update *update;
- TypedValue *gnmival;
- vector json_vec;
- string *json_ietf;
- int idx;
- google::protobuf::Map *key;
-
- /* Refresh configuration data from current session */
- sr_sess->refresh();
-
- /* Create appropriate TypedValue message based on encoding */
- switch (encoding) {
- case gnmi::JSON:
- case gnmi::JSON_IETF:
- /* Get sysrepo subtree data corresponding to XPATH */
- try {
- json_vec = encodef->json_read(fullpath);
- } catch (invalid_argument &exc) {
- return Status(StatusCode::NOT_FOUND, exc.what());
- } catch (sysrepo_exception &exc) {
- BOOST_LOG_TRIVIAL(error) << "Fail getting items from sysrepo: "
- << exc.what();
- return Status(StatusCode::INVALID_ARGUMENT, exc.what());
+ try {
+ /* Get multiple subtree for YANG lists or one for other YANG types */
+ auto sr_trees = sr_sess.getData(fullpath.c_str());
+ /* The path not (yet) existing isn't an error, so just return an empty set */
+ if (!sr_trees.has_value())
+ return Status::OK;
+
+ for (auto n : sr_trees->findXPath(fullpath.c_str())) {
+ auto update = updateList->Add();
+ node_get_gnmi_path(n, *update->mutable_path());
+ auto status = encodef->encode(encoding, n, update->mutable_val());
+ if (!status.ok()) {
+ updateList->Clear();
+ return status;
}
-
- /* Create new update message for every tree collected */
- for (auto it : json_vec) {
- update = updateList->Add();
- update->mutable_path()->CopyFrom(path);
-
- if (!it.key.first.empty()) {
- BOOST_LOG_TRIVIAL(debug) << "putting list entries key in gNMI path";
- idx = update->mutable_path()->elem_size() - 1;
- key = update->mutable_path()->mutable_elem(idx)->mutable_key();
- (*key)[it.key.first] = it.key.second;
- }
-
- gnmival = update->mutable_val();
-
- json_ietf = gnmival->mutable_json_ietf_val();
- *json_ietf = it.data;
- }
-
- break;
-
- default:
- return Status(StatusCode::UNIMPLEMENTED, Encoding_Name(encoding));
+ }
+ } catch (invalid_argument &exc) {
+ updateList->Clear();
+ return Status(StatusCode::NOT_FOUND, exc.what());
+ } catch (sysrepo::ErrorWithCode &exc) {
+ BOOST_LOG_TRIVIAL(error) << "Fail getting items from sysrepo: "
+ << exc.what();
+ updateList->Clear();
+ return Status(StatusCode::INVALID_ARGUMENT, exc.what());
}
return Status::OK;
@@ -95,33 +72,51 @@ Get::BuildGetUpdate(RepeatedPtrField* updateList,
* gNMI so deleted path in Notification message will always be empty.
*/
Status
-Get::BuildGetNotification(Notification *notification, const Path *prefix,
- const Path &path, gnmi::Encoding encoding)
+Get::BuildGetNotification(Notification *notification, const Path &prefix,
+ const Path &path, gnmi::Encoding encoding,
+ gnmi::GetRequest_DataType dataType)
{
/* Data elements that have changed values */
RepeatedPtrField* updateList = notification->mutable_update();
string fullpath = "";
+ auto ds = sysrepo::Datastore::Operational;
/* Get time since epoch in milliseconds */
notification->set_timestamp(get_time_nanosec());
- /* Put Request prefix as Response prefix */
- if (prefix != nullptr) {
- string str = gnmi_to_xpath(*prefix);
+ if (prefix.elem_size() > 0 || prefix.target().compare("")) {
+ string str;
+ try {
+ str = gnmi_to_xpath(prefix);
+ } catch (invalid_argument &exc) {
+ return Status(StatusCode::INVALID_ARGUMENT, exc.what());
+ }
BOOST_LOG_TRIVIAL(debug) << "prefix is " << str;
- notification->mutable_prefix()->CopyFrom(*prefix);
- fullpath += str;
+ // gNMI spec §2.2.2.1:
+ // When set in the prefix in a request, GetRequest, SetRequest or
+ // SubscribeRequest, the field MUST be reflected in the prefix of the
+ // corresponding GetResponse, SetResponse or SubscribeResponse by a
+ // server.
+ notification->mutable_prefix()->set_target(prefix.target());
+ if (prefix.elem_size() > 0) {
+ fullpath += str;
+ }
}
- fullpath += gnmi_to_xpath(path);
+ try {
+ gnmi_check_origin(prefix, path);
+ fullpath += gnmi_to_xpath(path);
+ } catch (invalid_argument &exc) {
+ return Status(StatusCode::INVALID_ARGUMENT, exc.what());
+ }
BOOST_LOG_TRIVIAL(debug) << "GetRequest Path " << fullpath;
+ if (dataType == gnmi::GetRequest_DataType_CONFIG)
+ ds = sysrepo::Datastore::Running;
- /* TODO Check DATA TYPE in {ALL,CONFIG,STATE,OPERATIONAL}
- * This is interesting for NMDA architecture
- * req->type() : GetRequest_DataType_ALL,CONFIG,STATE,OPERATIONAL
- */
- return BuildGetUpdate(updateList, path, fullpath, encoding);
+ SessionDsSwitcher ds_switch(sr_sess, ds);
+
+ return BuildGetUpdate(updateList, fullpath, encoding);
}
/* Verify request fields are correct */
@@ -180,11 +175,8 @@ Status Get::run(const GetRequest* req, GetResponse* response)
for (auto path : req->path()) {
notification = notificationList->Add();
- if (req->has_prefix())
- status = BuildGetNotification(notification, &req->prefix(), path, req->encoding());
- else
- status = BuildGetNotification(notification, nullptr, path, req->encoding());
-
+ status = BuildGetNotification(notification, req->prefix(), path,
+ req->encoding(), req->type());
if (!status.ok()) {
BOOST_LOG_TRIVIAL(error) << "Fail building get notification: "
<< status.error_message();
diff --git a/src/gnmi/get.h b/src/gnmi/get.h
index 9de1159..0f1e93c 100644
--- a/src/gnmi/get.h
+++ b/src/gnmi/get.h
@@ -1,5 +1,6 @@
/*
* Copyright 2020 Yohan Pipereau
+ * Copyright 2025 Graphiant Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,7 +20,7 @@
#include
-#include
+#include
#include "encode/encode.h"
using namespace gnmi;
@@ -31,21 +32,24 @@ namespace impl {
class Get {
public:
- Get(sysrepo::S_Session sess, std::shared_ptr encode)
- : sr_sess(sess), encodef(encode) {}
+ Get(sysrepo::Session sess)
+ : sr_sess(sess)
+ {
+ encodef = std::make_shared(sr_sess);
+ }
~Get() {}
Status run(const GetRequest* req, GetResponse* response);
private:
- Status BuildGetNotification(Notification *notification, const Path *prefix,
- const Path &path, gnmi::Encoding encoding);
+ Status BuildGetNotification(Notification *notification, const Path &prefix,
+ const Path &path, gnmi::Encoding encoding,
+ gnmi::GetRequest_DataType dataType);
Status BuildGetUpdate(RepeatedPtrField* updateList,
- const Path &path, string fullpath,
- gnmi::Encoding encoding);
+ string fullpath, gnmi::Encoding encoding);
private:
- sysrepo::S_Session sr_sess; //sysrepo session
+ sysrepo::Session sr_sess; //sysrepo session
shared_ptr encodef; //support for json ietf encoding
};
diff --git a/src/gnmi/gnmi.cpp b/src/gnmi/gnmi.cpp
index b141310..7967d87 100644
--- a/src/gnmi/gnmi.cpp
+++ b/src/gnmi/gnmi.cpp
@@ -1,5 +1,6 @@
/*
* Copyright 2020 Yohan Pipereau
+ * Copyright 2025 Graphiant Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,33 +20,90 @@
#include "get.h"
#include "set.h"
#include "subscribe.h"
+#include "confirm.h"
+#include "rpc.h"
-Status GNMIService::Set(ServerContext *context, const SetRequest* request,
- SetResponse* response)
+static std::atomic shutting_down;
+
+// cache server contexts for TryCancel on shutting down
+static std::set server_contexts;
+static std::mutex server_context_mutex;
+
+class ServerContextHolder {
+ public:
+ ServerContextHolder(ServerContext *ctx) : ctx(ctx) {
+ const std::lock_guard lock(server_context_mutex);
+ server_contexts.insert(ctx);
+ }
+ ~ServerContextHolder() {
+ const std::lock_guard lock(server_context_mutex);
+ server_contexts.erase(ctx);
+ }
+ private:
+ ServerContext *ctx;
+};
+
+void GNMIService::TryCancelAll(void)
+{
+ const std::lock_guard lock(server_context_mutex);
+ // forbid any new subscriptions by indicating we are shutting down
+ shutting_down.store(true);
+ for (auto ctx : server_contexts) {
+ ctx->TryCancel();
+ }
+ BOOST_LOG_TRIVIAL(debug) << "Sent cancellation to subscriptions";
+}
+
+Status GNMIService::Set(ServerContext *context, const SetRequest *request,
+ SetResponse *response)
{
(void)context;
- impl::Set rpc(sr_sess, encodef);
+ impl::Set rpc(sr_con.sessionStart(sysrepo::Datastore::Running), sr_con.sessionStart(sysrepo::Datastore::Startup), conf_state);
return rpc.run(request, response);
}
-Status GNMIService::Get(ServerContext *context, const GetRequest* request,
- GetResponse* response)
+Status GNMIService::Get(ServerContext *context, const GetRequest *request,
+ GetResponse *response)
{
(void)context;
- impl::Get rpc(sr_sess, encodef);
+ impl::Get rpc(sr_con.sessionStart(sysrepo::Datastore::Running));
return rpc.run(request, response);
}
-Status GNMIService::Subscribe(ServerContext* context,
- ServerReaderWriter* stream)
+Status GNMIService::Subscribe(ServerContext *context,
+ ServerReaderWriter *stream)
{
+ ServerContextHolder holder(context);
+
+ // If we are shutting down don't start any new subscriptions
+ // as TryCancelAll will not be called after this.
+ if (shutting_down.load()) {
+ BOOST_LOG_TRIVIAL(debug) << "Subscribe is not possible as server is shutting down";
+ return Status(StatusCode::UNAVAILABLE, string("Server is shutting down"));
+ }
+
SubscribeRequest request;
- impl::Subscribe rpc(sr_sess, encodef);
+ impl::Subscribe rpc(sr_con.sessionStart(sysrepo::Datastore::Running));
return rpc.run(context, stream);
+}
- return Status::OK;
+Status GNMIService::Confirm(ServerContext *context, const ConfirmRequest *request,
+ ConfirmResponse *response)
+{
+ (void)context;
+ impl::Confirm rpc(sr_con.sessionStart(sysrepo::Datastore::Startup), conf_state);
+
+ return rpc.run(request, response);
}
+Status GNMIService::Rpc(ServerContext *context, const RpcRequest *request,
+ RpcResponse *response)
+{
+ (void)context;
+ impl::Rpc rpc(sr_con.sessionStart(sysrepo::Datastore::Running));
+
+ return rpc.run(request, response);
+}
diff --git a/src/gnmi/gnmi.h b/src/gnmi/gnmi.h
index 027209e..78a83b4 100644
--- a/src/gnmi/gnmi.h
+++ b/src/gnmi/gnmi.h
@@ -1,5 +1,6 @@
/*
* Copyright 2020 Yohan Pipereau
+ * Copyright 2025 Graphiant Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,13 +18,17 @@
#ifndef _GNMI_SERVER_H
#define _GNMI_SERVER_H
+#include
+
#include
+#include
-#include
-#include
#include
+#include
#include "encode/encode.h"
+#include "confirm.h"
+#include "utils/log.h"
using namespace grpc;
using namespace gnmi;
@@ -36,17 +41,10 @@ using std::make_shared;
class GNMIService final : public gNMI::Service
{
public:
- GNMIService(string app) {
- try {
- sr_con = make_shared(app.c_str(), SR_CONN_DAEMON_REQUIRED);
- sr_sess = make_shared(sr_con);
- encodef = make_shared(sr_sess);
- } catch (sysrepo::sysrepo_exception &exc) {
- std::cerr << "Connection to sysrepo failed " << exc.what() << std::endl;
- exit(1);
- }
+ GNMIService(sysrepo::Connection conn) : sr_con(conn) {
+ conf_state = make_shared(conn);
}
- ~GNMIService() {std::cout << "Quitting GNMI Server" << std::endl; }
+ ~GNMIService() {BOOST_LOG_TRIVIAL(info) << "Quitting GNMI Server"; }
Status Capabilities(ServerContext* context,
const CapabilityRequest* request, CapabilityResponse* response);
@@ -60,10 +58,22 @@ class GNMIService final : public gNMI::Service
Status Subscribe(ServerContext* context,
ServerReaderWriter* stream);
+ Status Confirm(ServerContext *context,
+ const ConfirmRequest *request, ConfirmResponse *response);
+
+ Status Rpc(ServerContext *context,
+ const RpcRequest *request, RpcResponse *response);
+
+ static void TryCancelAll(void);
+
private:
- sysrepo::S_Connection sr_con; //sysrepo connection
- sysrepo::S_Session sr_sess; //sysrepo session
- shared_ptr encodef; //support for json ietf encoding
+ void ServerContextUpdate(ServerContext *ctx, bool add);
+ sysrepo::Connection sr_con; //sysrepo connection
+ shared_ptr conf_state;
};
+void RunServer(string bind_addr, shared_ptr cred, sysrepo::Connection sr_conn, std::promise ready = std::promise());
+
+void SetupSignalHandler(bool daemon = true);
+
#endif //_GNMI_SERVER_H
diff --git a/src/gnmi/gnmi_server.cpp b/src/gnmi/gnmi_server.cpp
new file mode 100644
index 0000000..6c85544
--- /dev/null
+++ b/src/gnmi/gnmi_server.cpp
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2020 Yohan Pipereau
+ * Copyright 2025 Graphiant Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+
+#include
+#include
+
+#include "gnmi/gnmi.h"
+#include
+#include
+
+using namespace std;
+
+static struct {
+ unique_ptr server;
+ int pipefd[2];
+} g_state;
+
+extern "C" void signal_handler(int signum) {
+ if (write(g_state.pipefd[1], &signum, sizeof signum) < 0) {
+ exit(2);
+ }
+}
+
+void SetupSignalHandler(bool is_daemon)
+{
+ // Set up the signal handler
+ if (pipe(g_state.pipefd) < 0) {
+ cerr << "Failed to create signal handler pipe " << strerror(errno) << endl;
+ exit(1);
+ }
+
+ // Block all signals for the main thread and other new threads
+ sigset_t set;
+ sigfillset(&set);
+ pthread_sigmask(SIG_BLOCK, &set, NULL);
+
+ // Register the signal handler
+ struct sigaction sa;
+ sa.sa_handler = &signal_handler;
+ sigemptyset(&sa.sa_mask);
+ sa.sa_flags = 0;
+ sigaction(SIGTERM, &sa, NULL);
+
+ // tests can receive SIGINT from user to interrupt the tests under gdb.
+ // So, only register for SIGINT, if we are running in daemon mode
+ if (is_daemon) {
+ sigaction(SIGINT, &sa, NULL);
+ }
+}
+
+static void wait_for_terminate(void)
+{
+ int signal = 0;
+
+ // UnBlock all signals for this thread
+ sigset_t set;
+ sigfillset(&set);
+ pthread_sigmask(SIG_UNBLOCK, &set, NULL);
+
+ while (read(g_state.pipefd[0], &signal, sizeof signal) < 0) {
+ // ignore interrupted system call
+ }
+
+ BOOST_LOG_TRIVIAL(debug) << "Shutting down due to " << strsignal(signal) << " signal";
+ GNMIService::TryCancelAll();
+
+ g_state.server->Shutdown();
+}
+
+void RunServer(string bind_addr, shared_ptr cred, sysrepo::Connection sr_conn, std::promise ready)
+{
+ // Get log environment variable
+ get_log_env();
+ // Get the maximum message size for Rx and Tx over gRPC configured in the environment.
+ const char *max_msg_size_kbytes = std::getenv("GNMI_MAX_MSG_SIZE_KB");
+ int GNMI_MAX_MSG_SIZE = atoi(max_msg_size_kbytes ? max_msg_size_kbytes : "65536");
+
+ try {
+ GNMIService gnmi(sr_conn); //gNMI Service
+
+ ServerBuilder builder;
+ builder.AddListeningPort(bind_addr, cred);
+ builder.RegisterService(&gnmi);
+
+ builder.SetMaxReceiveMessageSize(GNMI_MAX_MSG_SIZE * 1024);
+ builder.SetMaxSendMessageSize(GNMI_MAX_MSG_SIZE * 1024);
+ BOOST_LOG_TRIVIAL(info) << "Server MaxMsgSize " << GNMI_MAX_MSG_SIZE << " KB";
+
+ g_state.server = builder.BuildAndStart();
+ ready.set_value();
+
+ if (g_state.server == nullptr) {
+ BOOST_LOG_TRIVIAL(error) << "Failed to build gRPC server";
+ exit(1);
+ }
+
+ if (bind_addr.find(":") == string::npos) {
+ BOOST_LOG_TRIVIAL(info) << "Server listening on " << bind_addr << ":443";
+ } else {
+ BOOST_LOG_TRIVIAL(info) << "Server listening on " << bind_addr;
+ }
+
+ wait_for_terminate();
+ } catch (sysrepo::ErrorWithCode &exc) {
+ BOOST_LOG_TRIVIAL(error) << "Connection to sysrepo failed " << exc.what();
+ exit(1);
+ }
+
+ BOOST_LOG_TRIVIAL(info) << "GNMI Server exited";
+}
+
diff --git a/src/gnmi/rpc.cpp b/src/gnmi/rpc.cpp
new file mode 100644
index 0000000..c6bdf0e
--- /dev/null
+++ b/src/gnmi/rpc.cpp
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2025 Graphiant Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+#include
+
+#include "rpc.h"
+#include
+#include
+
+using namespace std;
+using google::protobuf::RepeatedPtrField;
+using grpc::StatusCode;
+using namespace libyang;
+
+namespace impl {
+
+// Implements gNMI Rpc RPC
+grpc::Status Rpc::run(const RpcRequest *request, RpcResponse *response) {
+ try {
+ auto xpath = gnmi_to_xpath(request->path());
+
+ // Convert RPC call timeout from seconds. Use 2000ms as default (per SR_RPC_CB_TIMEOUT)
+ // Max timeout of 10000ms (ensure less than SR_MAIN_LOCK_TIMEOUT)
+ uint32_t timeout = request->timeout() ? request->timeout() * 1000 : 2000;
+ if (timeout > 10000) {
+ timeout = 10000;
+ }
+
+ BOOST_LOG_TRIVIAL(debug) << "Rpc RPC (" << xpath << ") timeout " << timeout / 1000 << "s";
+
+ auto [status, input_node] = encodef->decode(xpath, request->val(), EncodePurpose::Rpc);
+
+ if (!status.ok()) {
+ BOOST_LOG_TRIVIAL(warning) << "Rpc input value error: " << status.error_message();
+ return status;
+ }
+
+ auto output_node = sr_sess.sendRPC(input_node.value(), std::chrono::milliseconds(timeout));
+
+ response->set_timestamp(get_time_nanosec());
+ if (!output_node.has_value()) {
+ return grpc::Status::OK;
+ }
+
+ status = encodef->encode(request->encoding(), *output_node, response->mutable_val());
+ if (!status.ok())
+ BOOST_LOG_TRIVIAL(warning) << "Rpc output value error: " << status.error_message();
+
+ return status;
+
+ } catch (invalid_argument &exc) {
+ return grpc::Status(StatusCode::INVALID_ARGUMENT, exc.what());
+ } catch (std::exception& ex) {
+ string err_str;
+ auto errors = sr_sess.getErrors();
+ if (!errors.empty()) {
+ err_str = errors[0].errorMessage;
+ } else {
+ err_str = ex.what();
+ }
+ BOOST_LOG_TRIVIAL(warning) << "RPC error: " << err_str;
+ return grpc::Status(StatusCode::ABORTED, err_str);
+ }
+}
+
+} // namespace impl
diff --git a/src/gnmi/rpc.h b/src/gnmi/rpc.h
new file mode 100644
index 0000000..c2f35c2
--- /dev/null
+++ b/src/gnmi/rpc.h
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2025 Graphiant Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _GNMI_RPC_H
+#define _GNMI_RPC_H
+
+#include
+#include
+
+#include "encode/encode.h"
+
+using namespace gnmi;
+using google::protobuf::RepeatedPtrField;
+using grpc::Status;
+
+namespace impl {
+
+class Rpc {
+public:
+ Rpc(sysrepo::Session sess)
+ : sr_sess(sess)
+ {
+ encodef = std::make_shared(sr_sess);
+ }
+ ~Rpc() {}
+
+ grpc::Status run(const RpcRequest *req, RpcResponse *response);
+
+ private:
+ sysrepo::Session sr_sess; //sysrepo session
+ shared_ptr encodef; //support for json ietf encoding
+};
+
+} // namespace impl
+
+#endif //_GNMI_RPC_H
diff --git a/src/gnmi/set.cpp b/src/gnmi/set.cpp
index b5b1780..5a0686f 100644
--- a/src/gnmi/set.cpp
+++ b/src/gnmi/set.cpp
@@ -1,5 +1,6 @@
/*
* Copyright 2020 Yohan Pipereau
+ * Copyright 2025 Graphiant Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,178 +16,308 @@
*/
#include "set.h"
+#include "confirm.h"
#include "encode/encode.h"
#include
#include
+#include
+#include
+#include
using namespace sysrepo;
using namespace std;
+using namespace libyang;
namespace impl {
-StatusCode Set::handleUpdate(Update in, UpdateResult *out, string prefix)
+std::tuple> Set::handleUpdate(Update in, UpdateResult *out, string prefix_str, const Path &prefix, string op)
{
- shared_ptr sval;
//Parse request
- if (!in.has_path() || !in.has_val()) {
- BOOST_LOG_TRIVIAL(error) << "Update no path or value";
- return StatusCode::INVALID_ARGUMENT;
- }
+ if (!in.has_path() || !in.has_val())
+ return std::make_tuple(grpc::Status(StatusCode::INVALID_ARGUMENT, "Update no path or value"), std::nullopt);
- string fullpath = prefix + gnmi_to_xpath(in.path());
- TypedValue reqval = in.val();
- BOOST_LOG_TRIVIAL(debug) << "Update" << fullpath;
-
- switch (reqval.value_case()) {
- case gnmi::TypedValue::ValueCase::kStringVal: /* No encoding */
- sval = make_shared(reqval.string_val().c_str());
- sr_sess->set_item(fullpath.c_str(), sval);
- break;
- case gnmi::TypedValue::ValueCase::kIntVal: /* No Encoding */
- sval = make_shared(reqval.int_val(), SR_INT64_T);
- sr_sess->set_item(fullpath.c_str(), sval);
- break;
- case gnmi::TypedValue::ValueCase::kUintVal: /* No Encoding */
- sval = make_shared(reqval.uint_val(), SR_UINT64_T);
- sr_sess->set_item(fullpath.c_str(), sval);
- break;
- case gnmi::TypedValue::ValueCase::kBoolVal: /* No Encoding */
- sval = make_shared(reqval.bool_val());
- sr_sess->set_item(fullpath.c_str(), sval);
- break;
- case gnmi::TypedValue::ValueCase::kBytesVal:
- throw std::invalid_argument("Unsupported BYTES Encoding");
- return StatusCode::UNIMPLEMENTED;
- case gnmi::TypedValue::ValueCase::kFloatVal:
- sval = make_shared(static_cast(reqval.float_val()));
- sr_sess->set_item(fullpath.c_str(), sval);
- break;
- case gnmi::TypedValue::ValueCase::kDecimalVal: /* No Encoding */
- throw std::invalid_argument("Unsupported Decimal64 type");
- return StatusCode::UNIMPLEMENTED;
- case gnmi::TypedValue::ValueCase::kLeaflistVal:
- throw std::invalid_argument("Unsupported leaflist type");
- return StatusCode::UNIMPLEMENTED;
- case gnmi::TypedValue::ValueCase::kAnyVal:
- throw std::invalid_argument("Unsupported PROTOBUF Encoding");
- return StatusCode::UNIMPLEMENTED;
- case gnmi::TypedValue::ValueCase::kJsonVal:
- throw std::invalid_argument("Unsupported JSON Encoding");
- return StatusCode::UNIMPLEMENTED;
- case gnmi::TypedValue::ValueCase::kJsonIetfVal:
- try {
- encodef->json_update(reqval.json_ietf_val());
- } catch (runtime_error &err) {
- //wrong input field must reply an error to gnmi client
- throw std::invalid_argument(err.what());
- return StatusCode::INVALID_ARGUMENT;
- } catch (invalid_argument &err) {
- throw std::invalid_argument(err.what());
- return StatusCode::INVALID_ARGUMENT;
- }
- break;
- case gnmi::TypedValue::ValueCase::kAsciiVal:
- throw std::invalid_argument("Unsupported ASCII Encoding");
- return StatusCode::UNIMPLEMENTED;
- case gnmi::TypedValue::ValueCase::kProtoBytes:
- throw std::invalid_argument("Unsupported PROTOBUF BYTE Encoding");
- return StatusCode::UNIMPLEMENTED;
- case gnmi::TypedValue::ValueCase::VALUE_NOT_SET:
- throw std::invalid_argument("Value not set");
- return StatusCode::INVALID_ARGUMENT;
- default:
- throw std::invalid_argument("Unknown value type");
- return StatusCode::INVALID_ARGUMENT;
+ string fullpath;
+ try {
+ gnmi_check_origin(prefix, in.path());
+ if (prefix.elem_size() > 0) {
+ fullpath += prefix_str;
+ }
+ fullpath += gnmi_to_xpath(in.path());
+ } catch (invalid_argument &exc) {
+ return std::make_tuple(grpc::Status(StatusCode::INVALID_ARGUMENT, exc.what()), std::nullopt);
}
+ BOOST_LOG_TRIVIAL(debug) << "Update (" << op << ") " << fullpath;
+
+ auto result = encodef->update(fullpath, in.val(), op);
//Fill in Reponse
out->set_allocated_path(in.release_path());
- return StatusCode::OK;
+ return result;
}
-Status Set::run(const SetRequest* request, SetResponse* response)
+grpc::Status Set::run(const SetRequest* request, SetResponse* response)
{
std::string prefix = "";
+ std::vector results;
+ UpdateTransaction xact;
+ auto ietf_nc_mod = sr_sess.getContext().getModuleImplemented("ietf-netconf").value();
+
+ // For observability, set transaction_id as log_id for subscribers and gnmi logging.
+ encodef->set_log_id(request->transaction_id());
+
+ // Check if we're waiting for a Confirm RPC
+ if (conf_state->get_wait_confirm()) {
+ return grpc::Status(StatusCode::UNAVAILABLE, "Previous Set has to be confirmed");
+ }
+
+ // Check if Set requires Confirm
+ if (request->has_confirm()) {
+ const ConfirmParmsRequest &conf_parms = request->confirm();
+ BOOST_LOG_TRIVIAL(debug)
+ << "Confirm msg has timeout=" << conf_parms.timeout_secs();
+ BOOST_LOG_TRIVIAL(debug)
+ << "Confirm msg has ignore-system-state: " << conf_parms.ignore_system_state();
+
+ if (not conf_parms.ignore_system_state()) {
+ // We have to check system state
+ }
+
+ // This (re)starts the timer, so as long as work here lasts less than
+ // timeout...
+ std::string err_msg = "";
+ if (not conf_state->set_wait_confirm(conf_parms.timeout_secs(), err_msg)) {
+ // Because of check above, should happen only in race condition
+ return grpc::Status(StatusCode::UNAVAILABLE, err_msg);
+ }
+
+ // Add ConfirmParmsResponse in SetResponse
+ ConfirmParmsResponse confirm;
+ confirm.set_min_wait_secs(conf_state->get_min_wait_conf_secs());
+ confirm.set_timeout_secs(conf_state->get_timeout_secs());
+ response->mutable_confirm()->CopyFrom(confirm);
+
+ }
if (request->extension_size() > 0) {
- BOOST_LOG_TRIVIAL(error) << "Extensions not implemented";
- return Status(StatusCode::UNIMPLEMENTED, "Extensions not implemented");
+ conf_state->clr_wait_confirm();
+ return grpc::Status(StatusCode::UNIMPLEMENTED, "not supported");
}
response->set_timestamp(get_time_nanosec());
/* Prefix for gNMI path */
if (request->has_prefix()) {
- prefix = gnmi_to_xpath(request->prefix());
+ try {
+ prefix = gnmi_to_xpath(request->prefix());
+ } catch (invalid_argument &exc) {
+ BOOST_LOG_TRIVIAL(error) << exc.what()
+ << ". Transaction-id:"
+ << request->transaction_id();
+ conf_state->clr_wait_confirm();
+ return grpc::Status(StatusCode::INVALID_ARGUMENT, exc.what());
+ }
BOOST_LOG_TRIVIAL(debug) << "prefix is " << prefix;
response->mutable_prefix()->CopyFrom(request->prefix());
}
/* gNMI paths to delete */
if (request->delete__size() > 0) {
+
+ // sort the paths to delete in reverse order to not delete children after parents
+ std::set> del_paths;
for (auto delpath : request->delete_()) {
- //Parse request and config sysrepo
- string fullpath = prefix + gnmi_to_xpath(delpath);
+ // Parse request and config sysrepo
+ string fullpath;
+ try {
+ gnmi_check_origin(request->prefix(), delpath);
+ fullpath = prefix + gnmi_to_xpath(delpath);
+ del_paths.insert(fullpath);
+ } catch (invalid_argument &exc) {
+ BOOST_LOG_TRIVIAL(error) << exc.what()
+ << ". Transaction-id:"
+ << request->transaction_id();
+ conf_state->clr_wait_confirm();
+ return grpc::Status(StatusCode::INVALID_ARGUMENT, exc.what());
+ }
+
+ // Fill in Reponse
+ UpdateResult res;
+ *(res.mutable_path()) = delpath;
+ res.set_op(gnmi::UpdateResult::DELETE);
+ results.push_back(res);
+ }
+
+ for (auto &fullpath : del_paths) {
BOOST_LOG_TRIVIAL(debug) << "Delete " << fullpath;
try {
- sr_sess->delete_item(fullpath.c_str()); //EDIT_DEFAULT option
- } catch (const exception &exc) {
+ // We cannot use deleteItem here as sysrepo doesn't like it being
+ // mixed with edit_batch, so retrieve just the nodes referenced by
+ // the xpath and no deeper to avoid it being any more expensive
+ // than it has to be
+ auto del_root = sr_sess.getData(fullpath, 1);
+ if (!del_root.has_value())
+ throw invalid_argument("xpath \"" + fullpath + "\" not found");
+
+ if (fullpath.compare("/*") == 0) {
+ // Walk all siblings and add delete node to them
+ for (auto n : del_root->siblings()) {
+ n.newMeta(ietf_nc_mod, "ietf-netconf:operation", "remove");
+
+ BOOST_LOG_TRIVIAL(debug) << " 1. Delete path: " << n.path();
+ }
+ } else {
+ // Find the node(s) actually referenced by the path to mark them as
+ // requiring delete since they could well be deeper than the root
+ // node
+ auto set = del_root->findXPath(fullpath.c_str());
+ if (set.empty())
+ throw invalid_argument("xpath \"" + fullpath + "\" not found");
+
+ auto sr_mod = sr_sess.getContext().getModuleImplemented("sysrepo").value();
+ for (auto n : set) {
+ // Ensure we don't create any parent nodes - they might be deleted
+ // in this transaction.
+ auto p = n.parent();
+ while (p.has_value()) {
+ p->newMeta(sr_mod, "sysrepo:operation", "ether");
+ p = p->parent();
+ }
+
+ n.newMeta(ietf_nc_mod, "ietf-netconf:operation", "remove");
+ BOOST_LOG_TRIVIAL(debug) << " 2. Delete path: " << n.path();
+ }
+ }
+ xact.push(*del_root);
+ } catch (const invalid_argument &exc) {
BOOST_LOG_TRIVIAL(error) << exc.what();
- return Status(StatusCode::INTERNAL, "delete item failed");
+ // gNMI spec §3.4.6: In the case that a path specifies an element within the data tree that does not exist, these deletes MUST be silently accepted.
+ } catch (const exception &exc) {
+ BOOST_LOG_TRIVIAL(error) << exc.what()
+ << ". Transaction-id:"
+ << request->transaction_id();
+ conf_state->clr_wait_confirm();
+ return grpc::Status(StatusCode::INVALID_ARGUMENT, exc.what());
}
- //Fill in Reponse
- UpdateResult* res = response->add_response();
- *(res->mutable_path()) = delpath;
- res->set_op(gnmi::UpdateResult::DELETE);
- }
+ }
}
/* gNMI paths with value to replace */
if (request->replace_size() > 0) {
for (auto &upd : request->replace()) {
- UpdateResult* res = response->add_response();
+ UpdateResult res;
try {
- handleUpdate(upd, res, prefix);
+ auto [status, node] = handleUpdate(upd, &res, prefix, request->prefix(), "replace");
+ if (!status.ok()) {
+ BOOST_LOG_TRIVIAL(error) << "Fail building set notification: "
+ << status.error_message()
+ << ". Transaction-id: "
+ << request->transaction_id();
+ conf_state->clr_wait_confirm();
+ return status;
+ }
+
+ res.set_op(gnmi::UpdateResult::REPLACE);
+ results.push_back(res);
+ if (node.has_value()) {
+ xact.push(*node);
+ }
} catch (const invalid_argument &exc) {
- BOOST_LOG_TRIVIAL(error) << exc.what();
- return Status(StatusCode::INVALID_ARGUMENT, exc.what());
- } catch (const sysrepo_exception &exc) {
- BOOST_LOG_TRIVIAL(error) << exc.what();
- return Status(StatusCode::INTERNAL, exc.what());
+ BOOST_LOG_TRIVIAL(error) << exc.what()
+ << ". Transaction-id:"
+ << request->transaction_id();
+ conf_state->clr_wait_confirm();
+ return grpc::Status(StatusCode::INVALID_ARGUMENT, exc.what());
+ } catch (sysrepo::Error &exc) {
+ BOOST_LOG_TRIVIAL(error) << exc.what()
+ << ". Transaction-id:"
+ << request->transaction_id();
+ conf_state->clr_wait_confirm();
+ return grpc::Status(StatusCode::INTERNAL, exc.what());
} catch (const exception &exc) { //Any other exception
- BOOST_LOG_TRIVIAL(error) << exc.what();
- return Status(StatusCode::INTERNAL, exc.what());
+ BOOST_LOG_TRIVIAL(error) << exc.what()
+ << ". Transaction-id:"
+ << request->transaction_id();
+ conf_state->clr_wait_confirm();
+ return grpc::Status(StatusCode::INTERNAL, exc.what());
}
- res->set_op(gnmi::UpdateResult::REPLACE);
}
}
/* gNMI paths with value to update */
if (request->update_size() > 0) {
for (auto &upd : request->update()) {
- UpdateResult* res = response->add_response();
+ UpdateResult res;
try {
- handleUpdate(upd, res, prefix);
+ auto [status, node] = handleUpdate(upd, &res, prefix, request->prefix(), "merge");
+ if (!status.ok()) {
+ BOOST_LOG_TRIVIAL(error) << "Fail building set notification: "
+ << status.error_message()
+ << ". Transaction-id: "
+ << request->transaction_id();
+ conf_state->clr_wait_confirm();
+ return status;
+ }
+ res.set_op(gnmi::UpdateResult::UPDATE);
+ results.push_back(res);
+ if (node.has_value()) {
+ xact.push(*node);
+ }
} catch (const invalid_argument &exc) {
- BOOST_LOG_TRIVIAL(error) << exc.what();
- return Status(StatusCode::INVALID_ARGUMENT, exc.what());
- } catch (const sysrepo_exception &exc) {
- BOOST_LOG_TRIVIAL(error) << exc.what();
- return Status(StatusCode::INTERNAL, exc.what());
+ BOOST_LOG_TRIVIAL(error) << exc.what()
+ << ". Transaction-id:"
+ << request->transaction_id();
+ conf_state->clr_wait_confirm();
+ return grpc::Status(StatusCode::INVALID_ARGUMENT, exc.what());
+ } catch (const sysrepo::Error &exc) {
+ BOOST_LOG_TRIVIAL(error) << exc.what()
+ << ". Transaction-id:"
+ << request->transaction_id();
+ conf_state->clr_wait_confirm();
+ return grpc::Status(StatusCode::INTERNAL, exc.what());
}
- res->set_op(gnmi::UpdateResult::UPDATE);
}
}
try {
- sr_sess->commit();
+ if (xact.first_node.has_value()) {
+ sr_sess.editBatch(xact.first_node.value(), sysrepo::DefaultOperation::Merge);
+ }
+
+ sr_sess.applyChanges();
+
+ if (!request->has_confirm()) {
+ sr_sess_startup.copyConfig(sysrepo::Datastore::Running);
+ }
+ conf_state->write_set_transaction_id(request->transaction_id());
+ } catch (const sysrepo::Error &exc) {
+ conf_state->clr_wait_confirm();
+ string err_str;
+ auto errors = sr_sess.getErrors();
+ if (errors.size()) {
+ err_str = errors[0].errorMessage;
+ } else {
+ err_str = exc.what();
+ }
+ sr_sess.discardChanges();
+ BOOST_LOG_TRIVIAL(error) << "commit error: "
+ << err_str
+ << ". Transaction-id:"
+ << request->transaction_id();
+ return grpc::Status(StatusCode::ABORTED, err_str);
} catch (const exception &exc) {
- BOOST_LOG_TRIVIAL(error) << exc.what();
- return Status(StatusCode::INTERNAL, "commit failed");
+ BOOST_LOG_TRIVIAL(error) << exc.what()
+ << ". Transaction-id:"
+ << request->transaction_id();
+ sr_sess.discardChanges();
+ return grpc::Status(StatusCode::INTERNAL, exc.what());
}
- return Status::OK;
-}
+ for (auto r : results)
+ *(response->add_response()) = r;
+ conf_state->reset_timers();
+ return grpc::Status::OK;
}
+
+} // namespace impl
diff --git a/src/gnmi/set.h b/src/gnmi/set.h
index 04ecc2c..a461c12 100644
--- a/src/gnmi/set.h
+++ b/src/gnmi/set.h
@@ -1,5 +1,6 @@
/*
* Copyright 2020 Yohan Pipereau
+ * Copyright 2025 Graphiant Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,9 +18,11 @@
#ifndef _GNMI_SET_H
#define _GNMI_SET_H
+#include
#include
+#include
-#include
+#include "confirm.h"
#include "encode/encode.h"
using namespace gnmi;
@@ -30,18 +33,23 @@ namespace impl {
class Set {
public:
- Set(sysrepo::S_Session sess, std::shared_ptr encode)
- : sr_sess(sess), encodef(encode) {}
+ Set(sysrepo::Session running_sess, sysrepo::Session startup_sess, shared_ptr confirm_state)
+ : sr_sess(running_sess), sr_sess_startup(startup_sess), conf_state(confirm_state)
+ {
+ encodef = std::make_shared(sr_sess);
+ }
~Set() {}
Status run(const SetRequest* request, SetResponse* response);
private:
- StatusCode handleUpdate(Update in, UpdateResult *out, string prefix);
+ std::tuple> handleUpdate(Update in, UpdateResult *out, string prefix_str, const Path &prefix, string op);
private:
- sysrepo::S_Session sr_sess; //sysrepo session
+ sysrepo::Session sr_sess; //sysrepo running datastore session
+ sysrepo::Session sr_sess_startup; //sysrepo startup datastore session
shared_ptr encodef; //support for json ietf encoding
+ shared_ptr conf_state; // commit confirm state
};
}
diff --git a/src/gnmi/subscribe.cpp b/src/gnmi/subscribe.cpp
index 26a49af..8ce9a7f 100644
--- a/src/gnmi/subscribe.cpp
+++ b/src/gnmi/subscribe.cpp
@@ -1,5 +1,6 @@
/*
* Copyright 2020 Yohan Pipereau
+ * Copyright 2025 Graphiant Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,117 +19,85 @@
#include
#include
#include
+#include
+#include
#include
+#include
+#include
+#include
#include "subscribe.h"
#include
#include
+#include "utils/sysrepo.h"
using namespace std;
using namespace chrono;
using google::protobuf::RepeatedPtrField;
-using sysrepo::sysrepo_exception;
namespace impl {
Status
Subscribe::BuildSubsUpdate(RepeatedPtrField* updateList,
- const Path &path, string fullpath,
- gnmi::Encoding encoding)
+ const Path &prefix, string fullpath,
+ gnmi::Encoding encoding)
{
Update *update;
- TypedValue *gnmival;
- vector json_vec;
- string *json_ietf;
- int idx;
- google::protobuf::Map *key;
-
- /* Create Update message */
- update = updateList->Add();
- update->mutable_path()->CopyFrom(path);
- gnmival = update->mutable_val();
-
- /* Refresh configuration data from current session */
- sr_sess->refresh();
-
- /* Create appropriate TypedValue message based on encoding */
- switch (encoding) {
- case gnmi::JSON:
- case gnmi::JSON_IETF:
- /* Get sysrepo subtree data corresponding to XPATH */
- try {
- json_vec = encodef->json_read(fullpath);
- } catch (invalid_argument &exc) {
- return Status(StatusCode::NOT_FOUND, exc.what());
- } catch (sysrepo_exception &exc) {
- BOOST_LOG_TRIVIAL(error) << "Fail getting items from sysrepo: "
- << exc.what();
- return Status(StatusCode::INVALID_ARGUMENT, exc.what());
- }
-
- /* Create new update message for every tree collected */
- for (auto it : json_vec) {
- update = updateList->Add();
- update->mutable_path()->CopyFrom(path);
- if (!it.key.first.empty()) {
- BOOST_LOG_TRIVIAL(debug) << "putting list entries key in gNMI path";
- idx = update->mutable_path()->elem_size() - 1;
- key = update->mutable_path()->mutable_elem(idx)->mutable_key();
- (*key)[it.key.first] = it.key.second;
- }
-
- gnmival = update->mutable_val();
+ if (prefix.elem_size() > 0) {
+ string str = gnmi_to_xpath(prefix);
+ fullpath = str + fullpath;
+ }
- json_ietf = gnmival->mutable_json_ietf_val();
- *json_ietf = it.data;
+ SessionDsSwitcher ds_switch(sr_sess, sysrepo::Datastore::Operational);
+
+ try {
+ /* Get multiple subtree for YANG lists or one for other YANG types */
+ auto sr_trees = sr_sess.getData(fullpath.c_str());
+ /* The path not (yet) existing isn't an error, so just return an empty set */
+ if (!sr_trees.has_value())
+ return Status::OK;
+
+ for (auto n : sr_trees->findXPath(fullpath.c_str())) {
+ update = updateList->Add();
+ node_get_gnmi_path(n, *update->mutable_path());
+ auto status = encodef->encode(encoding, n, update->mutable_val());
+ if (!status.ok()) {
+ updateList->Clear();
+ return status;
}
-
- break;
-
- case gnmi::PROTO:
- BOOST_LOG_TRIVIAL(error) << "Deviation from specification, Unsupported Yet";
- break;
-
- default:
- return Status(StatusCode::UNIMPLEMENTED, Encoding_Name(encoding));
+ }
+ } catch (invalid_argument &exc) {
+ updateList->Clear();
+ return Status(StatusCode::NOT_FOUND, exc.what());
+ } catch (sysrepo::ErrorWithCode &exc) {
+ BOOST_LOG_TRIVIAL(error) << "Fail getting items from sysrepo: "
+ << exc.code();
+ updateList->Clear();
+ return Status(StatusCode::INVALID_ARGUMENT, exc.what());
}
return Status::OK;
}
/**
- * BuildSubscribeNotification - Build a Notification message.
+ * BuildSubscribeNotification - Build a Notification message, excluding
+ * subscriptions which are on-change(this is done elsewhere, racy if done here).
* Contrary to Get Notification, gnmi specification highly recommands to
* put multiple in the same Notification message.
* @param notification the notification that is constructed by this function.
* @param request the SubscriptionList from SubscribeRequest to answer to.
+ * @param sample indicates whether there is at least 1 sample subscr
*/
Status
Subscribe::BuildSubscribeNotification(Notification *notification,
- const SubscriptionList& request)
+ const SubscriptionList& request,
+ bool *sample)
{
RepeatedPtrField* updateList = notification->mutable_update();
Status status;
- switch (request.encoding()) {
- case gnmi::JSON:
- case gnmi::JSON_IETF:
- BOOST_LOG_TRIVIAL(debug) << "JSON IETF";
- break;
-
- case gnmi::PROTO:
- BOOST_LOG_TRIVIAL(error) << "PROTO encoding will soon be supported";
- return Status(StatusCode::UNIMPLEMENTED, Encoding_Name(request.encoding()));
- break;
-
- default:
- BOOST_LOG_TRIVIAL(warning) << "Unsupported Encoding "
- << Encoding_Name(request.encoding());
- return Status(StatusCode::UNIMPLEMENTED, Encoding_Name(request.encoding()));
- }
-
// Defined refer to a long Path by a shorter one: alias
if (request.use_aliases()) {
BOOST_LOG_TRIVIAL(warning) << "Unsupported usage of aliases";
@@ -136,23 +105,50 @@ Subscribe::BuildSubscribeNotification(Notification *notification,
}
/* Check if only updates should be sent */
- if (request.updates_only())
+ if (request.updates_only()) {
BOOST_LOG_TRIVIAL(warning) << "Unsupported updates_only, send all paths";
+ return Status(StatusCode::UNIMPLEMENTED, "updates-only not supported");
+ }
/* Get time since epoch in milliseconds */
notification->set_timestamp(get_time_nanosec());
+ // gNMI spec §2.2.2.1:
+ // When set in the prefix in a request, GetRequest, SetRequest or
+ // SubscribeRequest, the field MUST be reflected in the prefix of the
+ // corresponding GetResponse, SetResponse or SubscribeResponse by a
+ // server.
if (request.has_prefix())
- notification->mutable_prefix()->CopyFrom(request.prefix());
+ notification->mutable_prefix()->set_target(request.prefix().target());
+ if (sample) {
+ *sample = false;
+ }
/* Fill Update RepeatedPtrField in Notification message
* Update field contains only data elements that have changed values. */
for (int i = 0; i < request.subscription_size(); i++) {
Subscription sub = request.subscription(i);
+ if (request.mode() == SubscriptionList_Mode_STREAM &&
+ (sub.mode() == SubscriptionMode::TARGET_DEFINED ||
+ sub.mode() == SubscriptionMode::ON_CHANGE)) {
+ BOOST_LOG_TRIVIAL(debug) << "On-change, getting initial data later: " << gnmi_to_xpath(sub.path());
+ continue;
+ }
+ if (sample) {
+ *sample = true;
+ }
// Fetch all found counters value for a requested path
- status = BuildSubsUpdate(updateList, sub.path(), gnmi_to_xpath(sub.path()),
- request.encoding());
+ string str;
+ try {
+ gnmi_check_origin(request.prefix(), sub.path());
+
+ status = BuildSubsUpdate(updateList, request.prefix(),
+ gnmi_to_xpath(sub.path()), request.encoding());
+ } catch (invalid_argument &exc) {
+ BOOST_LOG_TRIVIAL(error) << exc.what();
+ return Status(StatusCode::INVALID_ARGUMENT, exc.what());
+ }
if (!status.ok()) {
BOOST_LOG_TRIVIAL(error) << "Fail building update for "
<< gnmi_to_xpath(sub.path());
@@ -165,6 +161,288 @@ Subscribe::BuildSubscribeNotification(Notification *notification,
return Status::OK;
}
+/**
+ * BuildSubscribeNotificationForChanges - Build a Notification message.
+ * @param notification the notification that is constructed by this function.
+ * @param request the SubscriptionList from SubscribeRequest to answer to.
+ * @param xpath The xpath that the registration is firing for
+ * @param session The sysrepo session for the update
+ */
+Status
+Subscribe::BuildSubscribeNotificationForChanges(Notification *notification,
+ const SubscriptionList& request,
+ string& xpath,
+ sysrepo::Session session)
+{
+ auto updateList = notification->mutable_update();
+ auto deleteList = notification->mutable_delete_();
+ Status status;
+
+ // Defined refer to a long Path by a shorter one: alias
+ if (request.use_aliases()) {
+ BOOST_LOG_TRIVIAL(warning) << "Unsupported usage of aliases";
+ return Status(StatusCode::UNIMPLEMENTED, "alias not supported");
+ }
+
+ /* Check if only updates should be sent */
+ if (request.updates_only()) {
+ BOOST_LOG_TRIVIAL(warning) << "Unsupported updates_only, send all paths";
+ return Status(StatusCode::UNIMPLEMENTED, "updates-only not supported");
+ }
+
+ /* Get time since epoch in milliseconds */
+ notification->set_timestamp(get_time_nanosec());
+
+ // gNMI spec §2.2.2.1:
+ // When set in the prefix in a request, GetRequest, SetRequest or
+ // SubscribeRequest, the field MUST be reflected in the prefix of the
+ // corresponding GetResponse, SetResponse or SubscribeResponse by a
+ // server.
+ if (request.has_prefix())
+ notification->mutable_prefix()->set_target(request.prefix().target());
+
+ /* Fill Update RepeatedPtrField in Notification message
+ * Update field contains only data elements that have changed values. */
+
+ try {
+ auto last_change = make_pair(std::string(""), sysrepo::ChangeOperation::Created);
+
+ string changes_path(xpath);
+ changes_path += "//.";
+ auto iter = session.getChanges(changes_path.c_str());
+ for (const auto& change : iter) {
+ auto node_path = change.node.path();
+ if (!last_change.first.empty() &&
+ last_change.second == change.operation &&
+ // If we have the identifier of one leaf as a substring of another at the same level,
+ // we can confuse between the two.
+ // for example searching for "ike-connection" and finding "ike-connection-up".
+ // So search with "/" suffixed and prevent this mix-up.
+ (node_path.rfind(last_change.first + "/", 0) == 0 ||
+ node_path == last_change.first)
+ ) {
+ continue;
+ }
+ last_change = make_pair(node_path, change.operation);
+
+ // Also done for updated nodes due to gNMI spec §3.5.2.3:
+ // > To replace the contents of an entire node within the tree, the target populates
+ // > the delete field with the path of the node being removed, along with the new
+ // > contents within the update field.
+ if (change.operation != sysrepo::ChangeOperation::Created) {
+ auto path_p = deleteList->Add();
+ Path path;
+ node_get_gnmi_path(change.node, path);
+ *path_p = path;
+ }
+ if (change.operation != sysrepo::ChangeOperation::Deleted) {
+ auto update = updateList->Add();
+
+ node_get_gnmi_path(change.node, *update->mutable_path());
+ // Remove all of the attributes from nodes which we don't need and may confuse parsers of the JSON when
+ // using that encoding.
+ auto opts = static_cast(libyang::DuplicationOptions::NoMeta) |
+ static_cast(libyang::DuplicationOptions::Recursive);
+ auto node = change.node.duplicate(static_cast(opts));
+ status = encodef->encode(request.encoding(), node, update->mutable_val());
+ if (!status.ok())
+ return status;
+ }
+ }
+ } catch (sysrepo::ErrorWithCode &exc) {
+ BOOST_LOG_TRIVIAL(error) << "Fail processing module changes from sysrepo: "
+ << exc.what();
+ return Status(StatusCode::INVALID_ARGUMENT, exc.what());
+ } catch (invalid_argument &exc) {
+ BOOST_LOG_TRIVIAL(error) << exc.what();
+ return Status(StatusCode::INVALID_ARGUMENT, exc.what());
+ }
+
+ notification->set_atomic(false);
+
+ return Status::OK;
+}
+
+void Subscribe::triggerSampleUpdate(
+ ServerContext* context, Subscription &sub,
+ ServerReaderWriter* stream)
+{
+ SubscribeResponse response;
+ SubscriptionList updateList;
+
+ // Add the subscription entry to the subscription list
+ updateList.add_subscription()->CopyFrom(sub);
+
+ if (!context->IsCancelled()) {
+ auto status = BuildSubscribeNotification(response.mutable_update(),
+ updateList);
+ if(!status.ok()) {
+ // This is a hack to allow the Read in the parent thread to return,
+ // but it avoids needing to move to an asynchronous model just to return this one error
+ grpc::g_core_codegen_interface->grpc_call_cancel_with_status(
+ context->c_call(), static_cast(status.error_code()),
+ status.error_message().c_str(), nullptr);
+ return;
+ }
+ Write(stream, response);
+ response.Clear();
+ }
+}
+
+static void sample_timer_expiry(
+ const boost::system::error_code &e, std::shared_ptr t,
+ Subscription &sub, Subscribe *subscribe, ServerContext* context,
+ ServerReaderWriter* stream)
+{
+ if (e == boost::asio::error::operation_aborted) {
+ return;
+ }
+
+ subscribe->triggerSampleUpdate(context, sub, stream);
+
+ t->expires_at(t->expiry() + nanoseconds{sub.sample_interval()});
+ t->async_wait(boost::bind(sample_timer_expiry,
+ boost::asio::placeholders::error, t, sub, subscribe, context, stream));
+}
+
+void Subscribe::streamWorker(
+ ServerContext* context, SubscribeRequest request,
+ ServerReaderWriter* stream,
+ boost::asio::io_context &initial_update_io,
+ boost::asio::io_context &incr_update_io)
+{
+ vector> timers;
+
+ for (int i = 0; i < request.subscribe().subscription_size(); i++) {
+ Subscription sub = request.subscribe().subscription(i);
+ switch (sub.mode()) {
+ case SAMPLE: {
+ auto t = std::make_shared(incr_update_io, nanoseconds{sub.sample_interval()});
+ t->async_wait(boost::bind(sample_timer_expiry, boost::asio::placeholders::error, t, sub, this, context, stream));
+ timers.push_back(t);
+ break;
+ }
+ default:
+ break;
+ }
+ }
+
+ // Keep io_context running regardless if there are tasks to execute or not
+ boost::asio::executor_work_guard initial_work_guard(initial_update_io.get_executor());
+
+ initial_update_io.run();
+
+ boost::asio::executor_work_guard incr_work_guard(incr_update_io.get_executor());
+
+ incr_update_io.run();
+
+ BOOST_LOG_TRIVIAL(debug) << "Subscription stream worker exiting";
+}
+
+static void streamWorkerThread(Subscribe *sub, ServerContext* context, SubscribeRequest &request,
+ ServerReaderWriter* stream,
+ std::tuple io_context_tuple)
+{
+ boost::asio::io_context &initial_update_io = std::get<0>(io_context_tuple);
+ boost::asio::io_context &incr_update_io = std::get<1>(io_context_tuple);
+ sub->streamWorker(context, request, stream, initial_update_io, incr_update_io);
+}
+
+class SrModuleOnChangeParams {
+public:
+ SrModuleOnChangeParams(SubscribeRequest *request,
+ ServerReaderWriter *stream, Subscribe *subscribe,
+ boost::asio::io_context &initial_update_io_context,
+ boost::asio::io_context &incr_update_io_context) :
+ request(request), stream(stream), subscribe(subscribe), initial_update_io_context(initial_update_io_context),
+ incr_update_io_context(incr_update_io_context) {}
+
+ SubscribeRequest *request;
+ ServerReaderWriter* stream;
+ Subscribe *subscribe;
+ boost::asio::io_context &initial_update_io_context;
+ boost::asio::io_context &incr_update_io_context;
+
+ bool is_incremental(void) const {
+ // set incr_update=true and return the previous value.
+ return std::exchange(incr_update, true);
+ }
+private:
+ mutable bool incr_update = false;
+};
+
+sysrepo::ErrorCode srModuleOnChange(
+ sysrepo::Session session, std::string_view module_name, std::string_view xpath, sysrepo::Event event,
+ uint32_t request_id, const SrModuleOnChangeParams ¶ms)
+{
+ Status status;
+ auto response = make_unique();
+
+ (void)module_name;
+ (void)event;
+ (void)request_id;
+
+ string changes_path(xpath);
+
+ status = params.subscribe->BuildSubscribeNotificationForChanges(response->mutable_update(),
+ params.request->subscribe(), changes_path, session);
+ if (!status.ok()) {
+ BOOST_LOG_TRIVIAL(warning) << "unable to build update in response to notification for " << xpath;
+ return sysrepo::ErrorCode::Ok;
+ }
+
+ if (params.is_incremental()) {
+ params.subscribe->PostWrite(params.stream, std::move(response), params.incr_update_io_context);
+ } else {
+ params.subscribe->PostWrite(params.stream, std::move(response), params.initial_update_io_context);
+ }
+
+ return sysrepo::ErrorCode::Ok;
+}
+
+Status Subscribe::registerStreamOnChange(
+ SubscribeRequest &request, Subscription sub,
+ ServerReaderWriter* stream,
+ boost::asio::io_context &initial_update_io_context,
+ boost::asio::io_context &incr_update_io_context,
+ shared_ptr sr_sub,
+ vector ¶ms_vec)
+{
+ string fullpath = "";
+ try {
+ if (request.subscribe().prefix().elem_size() > 0 ||
+ request.subscribe().prefix().target().compare("")) {
+ fullpath = gnmi_to_xpath(request.subscribe().prefix());
+ }
+ fullpath += gnmi_to_xpath(sub.path());
+ } catch (invalid_argument &exc) {
+ BOOST_LOG_TRIVIAL(error) << exc.what();
+ return Status(StatusCode::INVALID_ARGUMENT, exc.what());
+ }
+
+ BOOST_LOG_TRIVIAL(debug) << "Subscribe (stream) " << fullpath;
+
+ SrModuleOnChangeParams params(&request, stream, this, initial_update_io_context, incr_update_io_context);
+ params_vec.push_back(params);
+ try {
+ auto params_ref = params_vec.back();
+ sr_sub->data_change_subscribe(
+ [params_ref]
+ (sysrepo::Session session, uint32_t sub_id, std::string_view module_name, std::optional xpath, sysrepo::Event event, uint32_t request_id)
+ {
+ (void)sub_id;
+ return srModuleOnChange(session, module_name, xpath.value(), event, request_id, params_ref);
+ },
+ fullpath.c_str(),
+ 0, sysrepo::SubscribeOptions::Passive | sysrepo::SubscribeOptions::DoneOnly | sysrepo::SubscribeOptions::Enabled);
+ } catch (const sysrepo::ErrorWithCode &exc) {
+ BOOST_LOG_TRIVIAL(error) << exc.what();
+ return Status(StatusCode::INTERNAL, exc.what());
+ }
+
+ return Status::OK;
+}
+
/**
* Handles SubscribeRequest messages with STREAM subscription mode by
* periodically sending updates to the client.
@@ -175,97 +453,132 @@ Status Subscribe::handleStream(
{
SubscribeResponse response;
Status status;
+ vector params_vec;
+ if (request.subscribe().subscription_size() == 0) {
+ return Status(StatusCode::INVALID_ARGUMENT,
+ "No subscription in message");
+ }
// Checks that sample_interval values are not higher than INT64_MAX
// i.e. 9223372036854775807 nanoseconds
for (int i = 0; i < request.subscribe().subscription_size(); i++) {
Subscription sub = request.subscribe().subscription(i);
- if (sub.sample_interval() > duration::max().count()) {
- context->TryCancel();
+ if (sub.sample_interval() > static_cast(duration::max().count()))
return Status(StatusCode::INVALID_ARGUMENT,
string("sample_interval must be less than ")
+ to_string(INT64_MAX) + " nanoseconds");
+
+ if (sub.mode() == SubscriptionMode::SAMPLE && nanoseconds{sub.sample_interval()} < milliseconds(200)) {
+ BOOST_LOG_TRIVIAL(warning) << "sample_interval " + to_string(sub.sample_interval()) +
+ " must be greater than " + to_string(nanoseconds{milliseconds(200)}.count()) +
+ " nanoseconds";
+ return Status(StatusCode::INVALID_ARGUMENT,
+ string("sample_interval ") + to_string(sub.sample_interval()) +
+ " must be greater than " + to_string(nanoseconds{milliseconds(200)}.count()) +
+ " nanoseconds");
}
+
}
- // Sends a first Notification message that updates all Subcriptions
+ // Get the initial data only for sample subscriptions
+ bool sample=false;
status = BuildSubscribeNotification(response.mutable_update(),
- request.subscribe());
- if (!status.ok()) {
- context->TryCancel();
+ request.subscribe(),
+ &sample);
+ if (!status.ok())
return status;
- }
- stream->Write(response);
- response.Clear();
- // Sends a SYNC message that indicates that initial synchronization
- // has completed, i.e. each Subscription has been updated once
- response.set_sync_response(true);
- stream->Write(response);
- response.Clear();
+ boost::asio::io_context initial_update_io_context;
+ boost::asio::io_context incr_update_io_context;
- // We use a vector of pairs instead of a map as we are going to iterate more
- // than we are going to retrieve specific keys.
- vector>> chronomap;
+ SessionDsSwitcher ds_switch(sr_sess, sysrepo::Datastore::Operational);
+ auto sr_sub = std::make_shared(sr_sess);
+
+ if (sample) {
+ BOOST_LOG_TRIVIAL(debug) << "Sending initial update for sample subscriptions with size:" << response.update().update_size();
+ // Sends a first Notification message that updates all sample subcriptions
+ Write(stream, response);
+ }
for (int i=0; iIsCancelled()) {
- auto start = high_resolution_clock::now();
-
- SubscribeRequest updateRequest(request);
- SubscriptionList* updateList(updateRequest.mutable_subscribe());
- updateList->clear_subscription();
-
- for (auto& pair : chronomap) {
- duration duration =
- high_resolution_clock::now()-pair.second;
- if (duration > nanoseconds{pair.first.sample_interval()}) {
- pair.second = high_resolution_clock::now();
- Subscription* sub = updateList->add_subscription();
- sub->CopyFrom(pair.first);
- }
- }
-
- if (updateList->subscription_size() > 0) {
- status = BuildSubscribeNotification(response.mutable_update(),
- updateRequest.subscribe());
- if(!status.ok()) {
- context->TryCancel();
- return status;
- }
- stream->Write(response);
- response.Clear();
- }
-
- // Caps the loop at 5 iterations per second
- auto loopTime = high_resolution_clock::now() - start;
- this_thread::sleep_for(milliseconds(200) - loopTime);
+ // Send to the worker thread
+ boost::asio::post(initial_update_io_context, [&]
+ {
+ // Sends a SYNC message that indicates that initial synchronization
+ // has completed, i.e. each Subscription has been updated once
+ SubscribeResponse response;
+ response.set_sync_response(true);
+ BOOST_LOG_TRIVIAL(debug) << "Sending sync response";
+ Write(stream, response);
+ initial_update_io_context.stop();
+ });
+
+ // Start a worker thread for SAMPLE and ON_CHANGE notifications (the only other type, TARGET_DEFINED, isn't
+ // supported).
+ auto thread = std::thread(streamWorkerThread, this, context, std::ref(request), stream,
+ std::make_tuple(std::ref(initial_update_io_context), std::ref(incr_update_io_context)));
+
+ // Read from client - note that isn't expected to succeed, but allows us to
+ // wait (without a busy loop) until the client cancels the streaming subscription and
+ // then we can terminate the worker thread immediately
+ SubscribeRequest request2;
+ auto success = stream->Read(&request2);
+
+ incr_update_io_context.stop();
+ thread.join();
+
+ if (success) {
+ BOOST_LOG_TRIVIAL(warning) << "out-of-order operation was requested on a STREAM subscription";
+ return Status(StatusCode::INVALID_ARGUMENT,
+ string("out-of-order operation was requested on a STREAM subscription"));
}
return Status::OK;
}
+void Subscribe::Write(
+ ServerReaderWriter* stream,
+ SubscribeResponse response)
+{
+ const std::lock_guard lock(stream_mutex);
+ stream->Write(response);
+}
+
+void Subscribe::PostWrite(
+ ServerReaderWriter* stream,
+ std::unique_ptr response, boost::asio::io_context &io_context)
+{
+ // Send to the worker thread
+ boost::asio::post(io_context, [this, stream, response = std::move(response)]
+ {
+ Write(stream, *response);
+ });
+}
+
/**
* Handles SubscribeRequest messages with ONCE subscription mode by updating
* all the Subscriptions once, sending a SYNC message, then closing the RPC.
*/
-Status Subscribe::handleOnce(ServerContext* context, SubscribeRequest request,
+Status Subscribe::handleOnce(SubscribeRequest request,
ServerReaderWriter* stream)
{
Status status;
@@ -274,18 +587,16 @@ Status Subscribe::handleOnce(ServerContext* context, SubscribeRequest request,
SubscribeResponse response;
status = BuildSubscribeNotification(response.mutable_update(),
request.subscribe());
- if (!status.ok()) {
- context->TryCancel();
+ if (!status.ok())
return status;
- }
- stream->Write(response);
+ Write(stream, response);
response.Clear();
// Sends a message that indicates that initial synchronization
// has completed, i.e. each Subscription has been updated once
response.set_sync_response(true);
- stream->Write(response);
+ Write(stream, response);
response.Clear();
return Status::OK;
@@ -295,7 +606,7 @@ Status Subscribe::handleOnce(ServerContext* context, SubscribeRequest request,
* Handles SubscribeRequest messages with POLL subscription mode by updating
* all the Subscriptions each time a Poll request is received.
*/
-Status Subscribe::handlePoll(ServerContext* context, SubscribeRequest request,
+Status Subscribe::handlePoll(SubscribeRequest request,
ServerReaderWriter* stream)
{
SubscribeRequest subscription = request;
@@ -309,12 +620,15 @@ Status Subscribe::handlePoll(ServerContext* context, SubscribeRequest request,
SubscribeResponse response;
status = BuildSubscribeNotification(response.mutable_update(),
subscription.subscribe());
- if (!status.ok()) {
- context->TryCancel();
+ if (!status.ok())
return status;
- }
- stream->Write(response);
+ Write(stream, response);
response.Clear();
+
+ // Reference 3.5.2.3:
+ // "For POLL subscriptions, after each set of updates for individual poll request, a SubscribeResponse message with the sync_response field set to true MUST be generated."
+ response.set_sync_response(true);
+ Write(stream, response);
break;
}
case request.kAliases:
@@ -348,22 +662,20 @@ Status Subscribe::run(ServerContext* context,
return Status(StatusCode::UNIMPLEMENTED, "Extensions not implemented");
}
- if (!request.has_subscribe()) {
- context->TryCancel();
+ if (!request.has_subscribe())
return Status(StatusCode::INVALID_ARGUMENT,
"SubscribeRequest needs non-empty SubscriptionList");
- }
switch (request.subscribe().mode()) {
case SubscriptionList_Mode_STREAM:
return handleStream(context, request, stream);
case SubscriptionList_Mode_ONCE:
- return handleOnce(context, request, stream);
+ return handleOnce(request, stream);
case SubscriptionList_Mode_POLL:
- return handlePoll(context, request, stream);
+ return handlePoll(request, stream);
default:
BOOST_LOG_TRIVIAL(error) << "Unknown subscription mode";
- return Status(StatusCode::UNKNOWN, "Unknown subscription mode");
+ return Status(StatusCode::UNIMPLEMENTED, "Unknown subscription mode");
}
return Status::OK;
diff --git a/src/gnmi/subscribe.h b/src/gnmi/subscribe.h
index a3c6c30..14505a7 100644
--- a/src/gnmi/subscribe.h
+++ b/src/gnmi/subscribe.h
@@ -1,5 +1,6 @@
/*
* Copyright 2020 Yohan Pipereau
+ * Copyright 2025 Graphiant Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,9 +19,11 @@
#define _GNMI_SUBSCRIBE_H
#include
+#include
-#include
+#include
#include "encode/encode.h"
+#include "utils/sysrepo.h"
using namespace gnmi;
using google::protobuf::RepeatedPtrField;
@@ -31,31 +34,62 @@ using grpc::StatusCode;
namespace impl {
+class SrModuleOnChangeParams;
+
class Subscribe {
public:
- Subscribe(sysrepo::S_Session sess, std::shared_ptr encode)
- : sr_sess(sess), encodef(encode) {}
+ Subscribe(sysrepo::Session sess)
+ : sr_sess(sess)
+ {
+ encodef = std::make_shared(sr_sess);
+ }
~Subscribe() {}
Status run(ServerContext* context,
ServerReaderWriter* stream);
+ void streamWorker(ServerContext* context, SubscribeRequest request,
+ ServerReaderWriter* stream,
+ boost::asio::io_context &initial_update_io,
+ boost::asio::io_context &incr_update_io);
+ void triggerSampleUpdate(
+ ServerContext* context, Subscription &sub,
+ ServerReaderWriter* stream);
+ Status BuildSubscribeNotification(Notification *notification,
+ const SubscriptionList& request,
+ bool *sample=nullptr);
+ Status BuildSubscribeNotificationForChanges(Notification *notification,
+ const SubscriptionList& request,
+ string& xpath,
+ sysrepo::Session session);
+ // To synchronize write access to the stream
+ void Write(ServerReaderWriter