Skip to content

Commit

Permalink
Add mux_frame_type_name UDF to ease developing mux pxl scripts
Browse files Browse the repository at this point in the history
Summary:
This adds a mux_frame_type_name UDF function that will be used in #607. It seems there is prior art for mysql, amqp and other protocols.

Closes #609.

Test Plan: New unit tests pass

Reviewers: zasgar, philkuz

Reviewed By: philkuz

Signed-off-by: Dom Del Nano <[email protected]>

Differential Revision: https://phab.corp.pixielabs.ai/D12340

GitOrigin-RevId: b1b54121dbf80f84fbadd646ffe8cde205286564
  • Loading branch information
Dom Del Nano authored and copybaranaut committed Oct 5, 2022
1 parent 05df293 commit 3b58f99
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 0 deletions.
72 changes: 72 additions & 0 deletions src/carnot/funcs/protocols/mux.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2018- The Pixie Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
#pragma once

#include <string>

namespace px {
namespace carnot {
namespace funcs {
namespace protocols {
namespace mux {

inline std::string FrameTypeName(int frame_type) {
switch (frame_type) {
case 1:
return "Treq";
case -1:
return "Rreq";
case 2:
return "Tdispatch";
case -2:
return "Rdispatch";
case 64:
return "Tdrain";
case -64:
return "Rdrain";
case 65:
return "Tping";
case -65:
return "Rping";
case 66:
return "Tdiscarded";
case -66:
return "Rdiscarded";
case 67:
return "Tlease";
case 68:
return "Tinit";
case -68:
return "Rinit";
case -128:
return "Rerr";
case 127:
return "Rerr (legacy)";
case -62:
return "Tdiscarded (legacy)";
default:
return absl::Substitute("Unknown ($0)", frame_type);
}
}

} // namespace mux

} // namespace protocols
} // namespace funcs
} // namespace carnot
} // namespace px
6 changes: 6 additions & 0 deletions src/carnot/funcs/protocols/protocol_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "src/carnot/funcs/protocols/amqp.h"
#include "src/carnot/funcs/protocols/http.h"
#include "src/carnot/funcs/protocols/kafka.h"
#include "src/carnot/funcs/protocols/mux.h"
#include "src/carnot/funcs/protocols/mysql.h"
#include "src/carnot/funcs/protocols/protocols.h"
#include "src/carnot/udf/registry.h"
Expand All @@ -42,6 +43,7 @@ void RegisterProtocolOpsOrDie(px::carnot::udf::Registry* registry) {
registry->RegisterOrDie<MySQLCommandNameUDF>("mysql_command_name");
registry->RegisterOrDie<AMQPFrameTypeUDF>("amqp_frame_type_name");
registry->RegisterOrDie<AMQPMethodTypeUDF>("amqp_method_name");
registry->RegisterOrDie<MuxFrameTypeUDF>("mux_frame_type_name");
}

types::StringValue ProtocolNameUDF::Exec(FunctionContext*, Int64Value protocol) {
Expand Down Expand Up @@ -69,6 +71,10 @@ types::StringValue MySQLCommandNameUDF::Exec(FunctionContext*, Int64Value api_ke
return mysql::CommandName(api_key.val);
}

types::StringValue MuxFrameTypeUDF::Exec(FunctionContext*, Int64Value frame_type) {
return mux::FrameTypeName(frame_type.val);
}

} // namespace protocols
} // namespace funcs
} // namespace carnot
Expand Down
13 changes: 13 additions & 0 deletions src/carnot/funcs/protocols/protocol_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,19 @@ class AMQPMethodTypeUDF : public px::carnot::udf::ScalarUDF {
}
};

class MuxFrameTypeUDF : public px::carnot::udf::ScalarUDF {
public:
StringValue Exec(FunctionContext*, Int64Value frame_type);

static udf::ScalarUDFDocBuilder Doc() {
return udf::ScalarUDFDocBuilder("Convert a Mux frame type to its name.")
.Details("UDF to convert Mux frame type into their corresponding human-readable names.")
.Arg("frame_type", "A Mux frame_type in numeric value")
.Example("df.frame_type_name = px.mux_frame_type_name(df.req_cmd)")
.Returns("The mux frame type name.");
}
};

void RegisterProtocolOpsOrDie(px::carnot::udf::Registry* registry);

} // namespace protocols
Expand Down
8 changes: 8 additions & 0 deletions src/carnot/funcs/protocols/protocol_ops_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ TEST(ProtocolOps, AMQPMethodTypeUDF) {
udf_tester.ForInput(60, 40).Expect("BasicPublish");
}

TEST(ProtocolOps, MuxFrameTypeUDF) {
auto udf_tester = udf::UDFTester<MuxFrameTypeUDF>();
udf_tester.ForInput(1).Expect("Treq");
udf_tester.ForInput(-1).Expect("Rreq");
udf_tester.ForInput(68).Expect("Tinit");
udf_tester.ForInput(-68).Expect("Rinit");
}

} // namespace protocols
} // namespace funcs
} // namespace carnot
Expand Down

0 comments on commit 3b58f99

Please sign in to comment.