diff --git a/Tobegin.txt b/Tobegin.txt new file mode 100644 index 00000000..a9d2da18 --- /dev/null +++ b/Tobegin.txt @@ -0,0 +1 @@ +1. Run "pip install -r requirements.txt" before runing the compiler. \ No newline at end of file diff --git a/compiler/element/__init__.py b/compiler/element/__init__.py index fe350218..4d248ce5 100644 --- a/compiler/element/__init__.py +++ b/compiler/element/__init__.py @@ -10,6 +10,11 @@ NativeContext, NativeGenerator, ) +from compiler.element.backend.eBPF.finalizer import finalize as eBPFFinalize +from compiler.element.backend.eBPF.nativegen import ( + eBPFContext, + eBPFGenerator, +) from compiler.element.backend.istio_wasm.analyzer import ( AccessAnalyzer as WasmAccessAnalyzer, ) @@ -86,7 +91,7 @@ def gen_code( raise ValueError(f"Method {method_name} not found in {proto_path}.") # proto = os.path.basename(proto_path).replace(".proto", "") proto = extract_proto_package_name(proto_path) - + print("backend_name =", backend_name) assert backend_name in ( "mrpc", "grpc", @@ -94,6 +99,7 @@ def gen_code( "sidecar_native", "ambient_wasm", "ambient_native", + "eBPF" ) compiler = ElementCompiler() @@ -160,6 +166,21 @@ def gen_code( tag=tag, envoy_verbose=envoy_verbose, ) + elif "eBPF" in backend_name: + generator = eBPFGenerator(placement) + finalize = eBPFFinalize + ctx = eBPFContext( + proto=proto, + method_name=method_name, + request_message_name=request_message_name, + response_message_name=response_message_name, + message_field_types=message_field_types, + mode="eBPF", + element_name=output_name, + tag=tag, + # envoy_verbose=envoy_verbose, + ) + print("eBPF backend") printer = Printer() @@ -199,10 +220,11 @@ def gen_code( consolidated.accept(GoAccessAnalyzer(placement), ctx) elif "native" in backend_name: pass - + print("Before consolidated.accept(generator, ctx)") + print("type(consolidated) =", type(consolidated)) # Second pass to generate the code consolidated.accept(generator, ctx) - + # Finalize the generated code finalize(output_name, ctx, output_dir, placement, proto_path) diff --git a/compiler/element/backend/eBPF/.gitignore b/compiler/element/backend/eBPF/.gitignore new file mode 100644 index 00000000..3b1f75e2 --- /dev/null +++ b/compiler/element/backend/eBPF/.gitignore @@ -0,0 +1 @@ +template/ \ No newline at end of file diff --git a/compiler/element/backend/eBPF/appnettype.py b/compiler/element/backend/eBPF/appnettype.py new file mode 100644 index 00000000..28f6a4b0 --- /dev/null +++ b/compiler/element/backend/eBPF/appnettype.py @@ -0,0 +1,258 @@ +# Created and used by Envoy native backend. + + +from typing import Optional + +from compiler.element.backend.eBPF.nativetype import RPC as NativeRPC +from compiler.element.backend.eBPF.nativetype import Bool as NativeBool +from compiler.element.backend.eBPF.nativetype import Bytes as NativeBytes +from compiler.element.backend.eBPF.nativetype import Float as NativeFloat +from compiler.element.backend.eBPF.nativetype import Int as NativeInt +from compiler.element.backend.eBPF.nativetype import Map as NativeMap +from compiler.element.backend.eBPF.nativetype import NativeType, NativeVariable +from compiler.element.backend.eBPF.nativetype import Option as NativeOption +from compiler.element.backend.eBPF.nativetype import Pair as NativePair +from compiler.element.backend.eBPF.nativetype import String as NativeString +from compiler.element.backend.eBPF.nativetype import ( + Timepoint as NativeTimepoint, +) +from compiler.element.backend.eBPF.nativetype import UInt as NativeUInt +from compiler.element.backend.eBPF.nativetype import UInt32 as NativeUInt32 +from compiler.element.backend.eBPF.nativetype import Vec as NativeVec +from compiler.element.logger import ELEMENT_LOG as LOG + +DEFAULT_DECORATOR = { + "consistency": "None", + "combiner": "None", + "persistence": "None", +} + +# Every expression, state and temporary variables in AppNet belong to a AppNetType. +# This is the base class for all types in AppNet. + + +class AppNetType: + def __init__(self, decorator: dict[str, str] = DEFAULT_DECORATOR): + self.decorator = decorator + + def to_native(self) -> NativeType: # type: ignore + LOG.error(f"to_native not implemented for {self}") + assert 0 + + def is_basic(self) -> bool: + return self.is_arithmetic() or self.is_bool() + + def is_arithmetic(self) -> bool: + return ( + isinstance(self, Int) or isinstance(self, UInt) or isinstance(self, Float) + ) + + def is_float(self) -> bool: + return isinstance(self, Float) + + def is_string(self) -> bool: + return isinstance(self, String) + + def is_string_literal(self) -> bool: + return isinstance(self, String) and self.literal is not None + + def is_bool(self) -> bool: + return isinstance(self, Bool) + + def is_same(self, other) -> bool: + return type(self) == type(other) + + def is_map(self) -> bool: + return isinstance(self, Map) + + def is_vec(self) -> bool: + return isinstance(self, Vec) + + def is_option(self) -> bool: + return isinstance(self, Option) + + def is_int(self) -> bool: + return isinstance(self, Int) + + def is_uint(self) -> bool: + return isinstance(self, UInt) + + def is_pair(self) -> bool: + return isinstance(self, Pair) + + def is_vec(self) -> bool: + return isinstance(self, Vec) + + +def appnet_type_from_str(name: str) -> AppNetType: + match name.lower(): + case "int": + return Int() + case "uint": + return UInt() + case "float": + return Float() + case "string": + return String() + case "bool": + return Bool() + case "bytes": + return Bytes() + case "instant": + return Instant() + case _: + if name.startswith("<") and name.endswith(">"): + # maybe a pair type: + inner = name[1:-1].split(",") + if len(inner) == 2: + return Pair( + appnet_type_from_str(inner[0].strip()), + appnet_type_from_str(inner[1].strip()), + ) + else: + raise Exception(f"Unknown type {name} when converting from string") + elif name.startswith("Vec"): + return Vec(appnet_type_from_str(name[4:-1])) + + raise Exception(f"Unknown type {name} when converting from string") + + +class RPC(AppNetType): + def to_native(self) -> NativeType: + return NativeRPC() + + +class Int(AppNetType): + def to_native(self) -> NativeType: + return NativeInt() + +class UInt32(AppNetType): + def to_native(self) -> NativeType: + return NativeUInt32() + +class UInt(AppNetType): + def to_native(self) -> NativeType: + return NativeUInt() + + +class Float(AppNetType): + def to_native(self) -> NativeType: + # return NativeFloat() + return NativeUInt() + + +class String(AppNetType): + literal: Optional[ + str + ] # Sometimes we have get(rpc, 'load') where 'load' is a string literal. + + def to_native(self) -> NativeType: + return NativeString() + + def __init__(self, literal: Optional[str] = None): + super().__init__() + self.literal = literal + + +class Bool(AppNetType): + def to_native(self) -> NativeType: + return NativeBool() + + +class Bytes(AppNetType): + def to_native(self) -> NativeType: + return NativeBytes() + + +class Instant(AppNetType): + def to_native(self) -> NativeType: + return NativeTimepoint() + + +class Option(AppNetType): # return type of get(Map, ...) + inner: AppNetType + + def __init__(self, inner: AppNetType): + super().__init__() + self.inner = inner + + def to_native(self) -> NativeType: + return NativeOption(self.inner.to_native()) + + +class Map(AppNetType): + key: AppNetType + value: AppNetType + + def __init__( + self, + key: AppNetType, + value: AppNetType, + decorator: dict[str, str] = DEFAULT_DECORATOR, + ): + super().__init__(decorator) + self.key = key + self.value = value + print(f"Map key: {key}, value: {value}") + + def to_native(self) -> NativeType: + return NativeMap(self.key.to_native(), self.value.to_native()) + + +class Vec(AppNetType): + type: AppNetType + + def __init__(self, type: AppNetType): + super().__init__() + self.type = type + + def to_native(self) -> NativeType: + return NativeVec(self.type.to_native()) + + +class Void(AppNetType): + def to_native(self) -> NativeType: # type: ignore + LOG.error(f"AppNet Void type cannot be converted to native type") + assert 0 + + +class Pair(AppNetType): + first: AppNetType + second: AppNetType + + def __init__(self, first: AppNetType, second: AppNetType): + super().__init__() + self.first = first + self.second = second + + def to_native(self) -> NativeType: + return NativePair(self.first.to_native(), self.second.to_native()) + + +class AppNetVariable: + name: str + type: AppNetType + + native_var: Optional[NativeVariable] + + def __init__(self, name: str, type: AppNetType): + self.name = name + self.type = type + + +def proto_type_to_appnet_type(proto_type: str) -> AppNetType: + match proto_type: + case "int32", "int64", "uint32", "uint64", "sint32", "sint64", "fixed32", "fixed64", "sfixed32", "sfixed64": + return Int() + case "float", "double": + return Float() + case "string": + return String() + case "bool": + return Bool() + case "bytes": + return Bytes() + case "google.protobuf.Timestamp": + return Instant() + case _: + raise Exception(f"Unknown proto type {proto_type}") diff --git a/compiler/element/backend/eBPF/finalizer.py b/compiler/element/backend/eBPF/finalizer.py new file mode 100644 index 00000000..6e537df4 --- /dev/null +++ b/compiler/element/backend/eBPF/finalizer.py @@ -0,0 +1,323 @@ +import os +from typing import Dict + +from compiler.config import COMPILER_ROOT +from compiler.element.backend.eBPF.nativegen import eBPFContext +from compiler.element.logger import ELEMENT_LOG as LOG + +from compiler.element.backend.eBPF.getkubernetes import * + +from compiler.element.backend.eBPF.types import * + +# def gen_map_access(name : str, idx : int, ctx) -> str: +# """ +# u32 {src/dst}Pod{idx}IP_key = 0; +# u32 {src/dst}Pod{idx}IP = 0; +# u32 *{src/dst}Pod{idx}Value = {src/dst}_pod.lookup(&{src/dst}Pod{idx}IP_key); +# if ({src/dst}Pod{idx}Value) { +# {src/dst}Pod{idx}IP = (*{src/dst}Pod{idx}Value); +# } +# """ +# ret_val = "" +# ret_val += f"u32 {name}Pod{idx}IP_key = 0;\n" +# ret_val += f"u32 {name}Pod{idx}IP = 0;\n" +# ret_val += f"u32 *{name}Pod{idx}Value = {name}_pod.lookup(&{name}Pod{idx}IP_key);\n" +# ret_val += f"if ({name}Pod{idx}Value) {{\n" +# ret_val += f" {name}Pod{idx}IP = (*{name}Pod{idx}Value);\n" +# ret_val += f"}}\n" +# return ret_val + +def codegen_from_template(output_dir, ctx: eBPFContext, lib_name, proto_path): + print(f"in codegen_from_template(), output_dir = {output_dir}") + print(f"in codegen_from_template(), ctx = {ctx}") + template_path = f"{COMPILER_ROOT}/element/backend/eBPF/template" + + # check if the template directory exists, if not, git clone. + if os.path.exists(template_path) == False or len(os.listdir(template_path)) == 0: + # git clone from git@github.com:appnet-org/envoy-appnet.git, master branch + os.system( + f"git clone git@github.com:appnet-org/envoy-appnet.git {template_path} --branch master" + ) + LOG.info(f"New template cloned from git repo to {template_path}") + + # check if the output directory exists, if not, copy the template to the output directory + # if the directory exists and non-empty, just rewrite the appnet_filter/appnet_filter.cc file and its .h file + if os.path.exists(output_dir) == False or len(os.listdir(output_dir)) == 0: + os.system(f"mkdir -p {output_dir}") + os.system( + f"bash -c 'cp -r {COMPILER_ROOT}/element/backend/eBPF/template/{{.,}}* {output_dir}'" + ) + LOG.info( + f"New template copied from {COMPILER_ROOT}/element/backend/eBPF/template to {output_dir}" + ) + else: + os.system(f"rm -f {output_dir}/appnet_filter/appnet_filter.cc") + os.system(f"rm -f {output_dir}/appnet_filter/appnet_filter.h") + os.system(f"rm -f {output_dir}/appnet_filter/appnet_filter_config.cc") + os.system( + f"cp {COMPILER_ROOT}/element/backend/eBPF/template/appnet_filter/appnet_filter.cc {output_dir}/appnet_filter/appnet_filter.cc" + ) + os.system( + f"cp {COMPILER_ROOT}/element/backend/eBPF/template/appnet_filter/appnet_filter.h {output_dir}/appnet_filter/appnet_filter.h" + ) + os.system( + f"cp {COMPILER_ROOT}/element/backend/eBPF/template/appnet_filter/appnet_filter_config.cc {output_dir}/appnet_filter/appnet_filter_config.cc" + ) + + # if ctx.on_tick_code != []: + # # acquire the global_state_lock + # ctx.on_tick_code = [ + # "std::unique_lock lock(global_state_lock);" + # ] + ctx.on_tick_code + + # if ctx.envoy_verbose: # type: ignore + # ctx.insert_envoy_log() + print(f"ctx.global_var_def = {ctx.global_var_def}, ctx.init_code = {ctx.init_code}, ctx.req_hdr_code = {ctx.req_hdr_code}, ctx.req_body_code = {ctx.req_body_code}") + eBPF_func_name = output_dir.split('/')[-1] + # TODO: get edgeN1, edgeN2 from the graph + edgeN1 = "frontend" + edgeN1Idx = 0 + conditionEdgeN1 = "" + edgeN2 = "server" + edgeN2Idx = 0 + conditionEdgeN2 = "" + user_space_init = f''' +def ip_to_hex(ip): + parts = ip.split(".") + return (int(parts[0]) << 24) | (int(parts[1]) << 16) | (int(parts[2]) << 8) | int(parts[3]) +def get_kubernetes_info(): + config.load_kube_config() + v1 = client.CoreV1Api() + namespace = "default" + services = v1.list_namespaced_service(namespace) + pods = v1.list_namespaced_pod(namespace) + services = v1.list_namespaced_service(namespace) + pod_map = {{pod.metadata.name: pod.metadata.labels for pod in pods.items}} + service_pod_mapping = {{}} + for svc in services.items: + svc_name = svc.metadata.name + svc_ip = svc.spec.cluster_ip + svc_selector = svc.spec.selector + if not svc_selector: + continue + label_selector = ",".join([f"{{k}}={{v}}" for k, v in svc_selector.items()]) + matching_pods = [ + pod_name for pod_name, labels in pod_map.items() + if labels and all(labels.get(k) == v for k, v in svc_selector.items()) + ] + service_pod_mapping[svc_name] = matching_pods + return service_pod_mapping, services, pods + +def update_pod_info_periodically(): + while True: + try: + service_pod_mapping, services, pods = get_kubernetes_info() + current_mapping_src_pod = {{}} + current_mapping_dst_pod = {{}} + for svc, all_pods in service_pod_mapping.items(): + for i in range(len(all_pods)): + for pod in pods.items: + if all_pods[i] == pod.metadata.name and pod.status.pod_ip: + ip_int = ip_to_hex(pod.status.pod_ip) + if svc == "{edgeN1}": + current_mapping_src_pod[ctypes.c_uint(ip_int)] = ctypes.c_uint(i) + elif svc == "{edgeN2}": + current_mapping_dst_pod[ctypes.c_uint(ip_int)] = ctypes.c_uint(i) + else: + continue + service_pod_map = b["src_pod"] + for ip_int, idx in current_mapping_src_pod.items(): + if service_pod_map.get(ip_int) != idx: + service_pod_map[ctypes.c_uint(ip_int)] = ctypes.c_uint(idx) + print("Update pod map") + for ip_int in list(service_pod_map.keys()): + if ip_int not in current_mapping: + del service_pod_map[ctypes.c_uint(ip_int)] + print("Update pod map") + + service_pod_map = b["dst_pod"] + for ip_int, idx in current_mapping_dst_pod.items(): + if service_pod_map.get(ip_int) != idx: + service_pod_map[ctypes.c_uint(ip_int)] = ctypes.c_uint(idx) + print("Update pod map") + for ip_int in list(service_pod_map.keys()): + if ip_int not in current_mapping_dst_pod: + del service_pod_map[ctypes.c_uint(ip_int)] + print("Update pod map") + time.sleep(10) + except Exception as e: + print(f"Failed to update pod info: {{e}}") + time.sleep(10) +''' +# service_pod_mapping, services, pods = get_kubernetes_info() +# for svc, all_pods in service_pod_mapping.items(): +# if svc == "{edgeN1}": +# service_pod_map = b["src_pod"] +# elif svc == "{edgeN2}": +# service_pod_map = b["dst_pod"] +# for i in range(len(all_pods)): +# for pod in pods.items: +# if all_pods[i] == pod.metadata.name: +# service_pod_map[ctypes.c_uint32(ip_to_hex(pod.status.pod_ip))] = ctypes.c_uint32(i) +# break ''' + # TODO: set interface as a parameter + curr_interface = "cni0" + user_space_running = f''' +interface = "{curr_interface}" +b.attach_xdp(interface, b.load_func("{eBPF_func_name}", bcc.BPF.XDP)) +print(f"{eBPF_func_name} eBPF program attached to {{interface}}...") +thread = threading.Thread(target=update_pod_info_periodically, daemon=True) +thread.start() +try: + while True: + time.sleep(5) + b.trace_print() +except KeyboardInterrupt: + print("Detaching eBPF program") + b.remove_xdp(interface) +''' + POPULATEKUBERNETESStr = "" + definitionEdgeN1 = "" + definitionEdgeN2 = "" + service_pod_mapping, services, pods = get_kubernetes_info() + # Check whether the src/dst IP belongs to the pod IPs of the service + for svc, all_pods in service_pod_mapping.items(): + print("svc=", svc, "all_pods=", all_pods) + if svc == edgeN1: + print("come here") + new_map_var = AppNetMap(AppNetUInt32(), AppNetUInt32()) + _, decl = ctx.declareeBPFVar("src_pod", new_map_var.to_native()) + ctx.push_global_var_def(decl) + definitionEdgeN1 = "u32 *src_pod_value = src_pod.lookup(&src_ip);" + # for i in range(len(all_pods)): + # for pod in pods.items: + # if all_pods[i] == pod.metadata.name: + # POPULATEKUBERNETESStr += gen_map_access(name="src", idx=edgeN1Idx, ctx=ctx) + # if conditionEdgeN1 == "": + # conditionEdgeN1 = f"((src_ip == srcPod{edgeN1Idx}IP)" + # else: + # conditionEdgeN1 += f"|| (src_ip == srcPod{edgeN1Idx}IP)" + # edgeN1Idx += 1 + # break + elif svc == edgeN2: + new_map_var = AppNetMap(AppNetUInt32(), AppNetUInt32()) + _, decl = ctx.declareeBPFVar("dst_pod", new_map_var.to_native()) + ctx.push_global_var_def(decl) + definitionEdgeN2 = "u32 *dst_pod_value = dst_pod.lookup(&dst_ip);" + # for i in range(len(all_pods)): + # for pod in pods.items: + # if all_pods[i] == pod.metadata.name: + # POPULATEKUBERNETESStr += gen_map_access(name="dst", idx=edgeN2Idx, ctx=ctx) + # if conditionEdgeN2 == "": + # conditionEdgeN2 = f"((dst_ip == dstPod{edgeN2Idx}IP)" + # else: + # conditionEdgeN2 += f"|| (dst_ip == dstPod{edgeN2Idx}IP)" + # edgeN2Idx += 1 + # break + # if conditionEdgeN1 != "": + # conditionEdgeN1 += ")" + # if conditionEdgeN2 != "": + # conditionEdgeN2 += ")" + if definitionEdgeN1 != "" and definitionEdgeN2 != "": + POPULATEKUBERNETESStr += "u32 src_ip = bpf_ntohl(ip->saddr);\n" + "u32 dst_ip = bpf_ntohl(ip->daddr);\n" + POPULATEKUBERNETESStr += "bpf_trace_printk(\"Src ip is %u\\\\n\", src_ip);\n" + POPULATEKUBERNETESStr += "bpf_trace_printk(\"Dst ip is %u\\\\n\", dst_ip);\n" + POPULATEKUBERNETESStr += f"{definitionEdgeN1}\n" + POPULATEKUBERNETESStr += f"{definitionEdgeN2}\n" + POPULATEKUBERNETESStr += "if (!src_pod_value || !dst_pod_value) {return XDP_PASS; }\n" + print(f"POPULATEKUBERNETESStr = {POPULATEKUBERNETESStr}") + + replace_dict = { + "// !APPNET_BEG": + ["import bcc"] + + ["from bcc import BPF"] + + ["import socket"] + + ["import ctypes"] + + ["from kubernetes import client, config"] + + ["import time"] + + ["import threading"] + + ["bpf_code = \'\'\'"] + + ["#include "] + + ["#include "] + + ["#include "] + + ["#include "] + + ["#include "] + + ["#include "] + + ["#include "], + "// !APPNET_STATE": ctx.global_var_def, + "// !APPNET_INIT": + ["b = BPF(text=bpf_code)"] + + [user_space_init] + + ctx.init_code + + [user_space_running], + "// !APPNET_REQUEST": + # ["{ // req header begin. "] + # + ctx.req_hdr_code + # + ["} // req header end."] + [f"int {eBPF_func_name}(struct xdp_md *ctx)"] + + ["{ // req body begin. "] + + ["void *data = (void *)(long)ctx->data;"] + + ["void *data_end = (void *)(long)ctx->data_end;"] + + ["struct ethhdr *eth = data;"] + + ["if ((void *)(eth + 1) > data_end)"] + + [" return XDP_PASS; // If the packet is too short to have an Ethernet header, pass"] + + ["if (eth->h_proto != htons(ETH_P_IP)) {"] + + [" return XDP_PASS;"] + + ["}"] + + ["struct iphdr *ip = (struct iphdr *)(eth + 1);"] + + ["if ((void *)(ip + 1) > data_end)"] + + [" return XDP_PASS;"] + + [POPULATEKUBERNETESStr] + + ["bpf_trace_printk(\"Hitting appnet program\\\\n\");"] + + ctx.req_body_code + + ["} // req body end.\'\'\'"], + "// !APPNET_RESPONSE": [] + # "// !APPNET_RESPONSE": ["{ // resp header begin. "] + # + ctx.resp_hdr_code + # + ["} // resp header end."] + # + ["{ // resp body begin. "] + # + ctx.resp_body_code + # + ["} // resp body end."], + # "// !APPNET_ONTICK": ctx.on_tick_code, + } + + # rewrite appnet_filter/appnet_filter.cc according to the replace dict + with open(f"{output_dir}/appnet_filter/appnet_filter.cc", "r") as file: + appnet_filter = file.read() + print(f"Initial appnet_filter = {appnet_filter}") + for key, value in replace_dict.items(): + appnet_filter = appnet_filter.replace(key, "\n".join(value)) + print(f"After replace appnet_filter = {appnet_filter}") + print(f"{output_dir}/appnet_filter/appnet_filter.py") + with open(f"{output_dir}/appnet_filter/appnet_filter.py", "w") as file: + file.write(appnet_filter) + + # remove .git to prevent strange bazel build behavior + os.system(f"rm -rf {output_dir}/.git") + + # clang format + os.system(f"clang-format -i {output_dir}/appnet_filter/appnet_filter.cc") + + # TODO: smarter way to define unique symbol name + # Rename base64_encode and base64_decode to avoid symbol confliction + files = [ + os.path.join(output_dir, "appnet_filter/appnet_filter.cc"), + os.path.join(output_dir, "appnet_filter/thirdparty/base64.h"), + ] + for filename in files: + with open(filename, "r") as f: + content = f.read() + content = content.replace("base64_encode", f"base64_encode_{lib_name}") + content = content.replace("base64_decode", f"base64_decode_{lib_name}") + with open(filename, "w") as f: + f.write(content) + + LOG.info( + f"Backend code for {lib_name} generated. You can find the source code at {output_dir}" + ) + + +def finalize( + name: str, ctx: eBPFContext, output_dir: str, placement: str, proto_path: str +): + codegen_from_template(output_dir, ctx, name, proto_path) diff --git a/compiler/element/backend/eBPF/getkubernetes.py b/compiler/element/backend/eBPF/getkubernetes.py new file mode 100644 index 00000000..49d98242 --- /dev/null +++ b/compiler/element/backend/eBPF/getkubernetes.py @@ -0,0 +1,67 @@ +from kubernetes import client, config + +def ip_to_hex(ip): + """Convert IP string (e.g., 10.244.0.4) to hex integer.""" + parts = ip.split(".") + return (int(parts[0]) << 24) | (int(parts[1]) << 16) | (int(parts[2]) << 8) | int(parts[3]) + +'''Get IPs for all services and pods; get the mapping from service to all its pods''' +def get_kubernetes_info(): + # Load Kubernetes config (use `config.load_incluster_config()` if running inside a cluster) + config.load_kube_config() + + v1 = client.CoreV1Api() + + namespace = "default" # Change this to your namespace + + # Get all services in the namespace + services = v1.list_namespaced_service(namespace) + + # Get all pods in the namespace + pods = v1.list_namespaced_pod(namespace) + + # for pod in pods.items: + # print(f"Pod: {pod.metadata.name}, IP: {pod.status.pod_ip}", "hex_ip", ip_to_hex(pod.status.pod_ip)) + + # Get all services in the same namespace + services = v1.list_namespaced_service(namespace) + # for svc in services.items: + # print(f"Service: {svc.metadata.name}, Cluster IP: {svc.spec.cluster_ip}", "hex_ip", ip_to_hex(svc.spec.cluster_ip)) + + + # Create a mapping of pod name -> labels + pod_map = {pod.metadata.name: pod.metadata.labels for pod in pods.items} + + # Iterate over each service to find matching pods + service_pod_mapping = {} + + for svc in services.items: + svc_name = svc.metadata.name + svc_ip = svc.spec.cluster_ip + svc_selector = svc.spec.selector + + if not svc_selector: + # print(f"Service {svc_name} has no selectors (it may be an ExternalName service).") + continue + + # Convert service selectors to key=value format + label_selector = ",".join([f"{k}={v}" for k, v in svc_selector.items()]) + + # Find matching pods + matching_pods = [ + pod_name for pod_name, labels in pod_map.items() + if labels and all(labels.get(k) == v for k, v in svc_selector.items()) + ] + + service_pod_mapping[svc_name] = matching_pods + return service_pod_mapping, services, pods + +service_pod_mapping, services, pods = get_kubernetes_info() +for svc, all_pods in service_pod_mapping.items(): + print(f"svc={svc}, all_pods={all_pods}") + print("svc=frontend is", svc=="frontend") + for i in range(len(all_pods)): + for pod in pods.items: + if all_pods[i] == pod.metadata.name: + print(f"all_pods[i] = {all_pods[i]}, ip_to_hex(pod.status.pod_ip) = {ip_to_hex(pod.status.pod_ip)}") + # break \ No newline at end of file diff --git a/compiler/element/backend/eBPF/nativegen.py b/compiler/element/backend/eBPF/nativegen.py new file mode 100644 index 00000000..d1832a95 --- /dev/null +++ b/compiler/element/backend/eBPF/nativegen.py @@ -0,0 +1,2799 @@ +from copy import deepcopy +from typing import Dict, List, Optional, Set + +from compiler.element.backend.eBPF.appnettype import DEFAULT_DECORATOR +from compiler.element.backend.eBPF.types import * +from compiler.element.backend.eBPF.types import NativeVariable +from compiler.element.frontend.printer import Printer +from compiler.element.logger import ELEMENT_LOG as LOG +from compiler.element.node import * +from compiler.element.node import Identifier, Pattern +from compiler.element.visitor import Visitor + +import traceback +class eBPFContext: + def __init__( + self, + proto=None, + method_name=None, + request_message_name=None, + response_message_name=None, + message_field_types: Optional[dict[str, dict[str, str]]] = None, + mode: str = "eBPF", + element_name: str = "", + tag: str = "0", + # envoy_verbose: bool = False, + ) -> None: + self.appnet_var: list[dict[str, AppNetVariable]] = [ + {} + ] # The first scope is the states. + self.native_var: list[dict[str, NativeVariable]] = [ + {} + ] # The first scope is the global scope + self.global_var_def: list[str] = [] + self.init_code: list[str] = [] + self.on_tick_code: list[str] = [] + self.req_hdr_code: list[str] = [] + self.req_body_code: list[str] = [] + self.resp_hdr_code: list[str] = [] + self.resp_body_code: list[str] = [] + + self.current_procedure: str = "" + self.current_procedure_code: list[str] = [] + + self.tmp_cnt: int = 0 + + assert message_field_types is not None + self.message_field_types: dict[str, dict[str, str]] = message_field_types + + # Dirty hack for get_rpc_header type inference + self.most_recent_assign_left_type: Optional[AppNetType] = None + + # self.envoy_verbose = envoy_verbose + self.global_state_lock_held = False + + def print_content(self): + print(f"self.appnet_var = {self.appnet_var}") + print(f"self.native_var = {self.native_var}") + + print(f"self.global_var_def = {self.global_var_def}") + print(f"self.init_code = {self.init_code}") + + print(f"self.on_tick_code = {self.on_tick_code}") + print(f"self.req_hdr_code = {self.req_hdr_code}") + + print(f"self.req_body_code = {self.req_body_code}") + print(f"self.resp_hdr_code = {self.resp_hdr_code}") + print(f"self.resp_body_code = {self.resp_body_code}") + + + # self.current_procedure: str = "" + # self.current_procedure_code: list[str] = [] + + # self.tmp_cnt: int = 0 + + # assert message_field_types is not None + # self.message_field_types: dict[str, dict[str, str]] = message_field_types + + # # Dirty hack for get_rpc_header type inference + # self.most_recent_assign_left_type: Optional[AppNetType] = None + + # # self.envoy_verbose = envoy_verbose + # self.global_state_lock_held = False) + + + def insert_envoy_log(self) -> None: + # Insert ENVOY_LOG(info, stmt) after each statement in req and resp + # This is for debugging. + # Then we will have 2 times more lines of code. + + def process(codes: List[str]) -> List[str]: + new_code = [] + for stmt in codes: + if ( + "for" in stmt + or "if" in stmt + or "else" in stmt + or "while" in stmt + or "{" in stmt + or "}" in stmt + ) == False: + trans_stmt = stmt.replace('"', '\\"') + new_code.append(f'ENVOY_LOG(warn, "{trans_stmt}");') + + new_code.append(stmt) + + return new_code + + self.req_hdr_code = process(self.req_hdr_code) + self.req_body_code = process(self.req_body_code) + self.resp_hdr_code = process(self.resp_hdr_code) + self.resp_body_code = process(self.resp_body_code) + + def push_appnet_scope(self) -> None: + self.appnet_var.append({}) + + def pop_appnet_scope(self) -> None: + popped = self.appnet_var.pop() + LOG.debug(f"appnet scope popped {popped}") + + def push_native_scope(self) -> None: + self.native_var.append({}) + + def pop_native_scope(self) -> None: + self.native_var.pop() + + def push_code(self, code: str) -> None: + code = code.strip() + self.current_procedure_code.append(code) + if code.endswith("{"): + self.push_native_scope() + + if code.startswith("}"): + self.pop_native_scope() + + def push_global_var_def(self, code: str) -> None: + self.global_var_def.append(code) + + def new_temporary_name(self) -> str: + self.tmp_cnt += 1 + return f"temp_{self.tmp_cnt}" + + def declareAppNetLocalVariable( + self, + name: str, + rtype: AppNetType, + ) -> AppNetVariable: + if name in self.appnet_var[-1]: + raise Exception(f"variable {name} already declared as a local variable") + self.appnet_var[-1][name] = AppNetVariable(name, rtype) + return self.appnet_var[-1][name] + + def declareAppNetState( + self, + name: str, + rtype: AppNetType, + decorator: dict[str, str] = DEFAULT_DECORATOR, + ) -> AppNetVariable: + if name in self.appnet_var[0]: + raise Exception(f"variable {name} already declared") + + rtype.decorator = decorator + self.appnet_var[0][name] = AppNetVariable(name, rtype) + return self.appnet_var[0][name] + + def declareeBPFVar( + self, name: str, rtype: NativeType, local: bool = True + ) -> Tuple[NativeVariable, str]: + # Declare a native var in the current scope. + # It will return the declared variable and the declaration statement. + # The declaration statement should be pushed to the current procedure code by the caller manually. + if name in self.native_var[-1]: + raise Exception(f"variable {name} already declared") + self.native_var[-1][name] = NativeVariable(name, rtype, local) + if local: + return (self.native_var[-1][name], rtype.gen_decl_local(name)) + return (self.native_var[-1][name], rtype.gen_decl(name)) + + def find_native_var(self, name: str) -> NativeVariable: + for scope in reversed(self.native_var): + if name in scope: + return scope[name] + raise Exception(f"variable {name} not found") + + def get_appnet_var(self, name: str) -> Tuple[AppNetVariable, bool]: + for scope in reversed(self.appnet_var): + if name in scope: + return (scope[name], False) + if name in self.appnet_var[0]: + return (self.appnet_var[0][name], True) + raise Exception(f"variable {name} not found") + + def find_appnet_var(self, name: str) -> Optional[Tuple[AppNetVariable, bool]]: + print(f"find_appnet_var: name = {name}") + try: + print(f"self.get_appnet_var(name) = {self.get_appnet_var(name)}") + return self.get_appnet_var(name) + except: + return None + + def getHeaderPath(self): + return ( + "decoder_callbacks_->requestHeaders()" + if self.current_procedure == "req" + else "encoder_callbacks_->responseHeaders()" + ) + + def genBlockingHttpRequest(self, cluster_name: str, url: NativeVariable) -> None: + + assert url.type.is_string() + assert self.current_procedure in ["req", "resp", "init"] + + self.push_code("{") + self.push_code(f' ENVOY_LOG(info, "[AppNet Filter] Blocking HTTP Request");') + self.push_code(f" Awaiter http_awaiter = Awaiter();") + self.push_code(f" this->http_awaiter_ = &http_awaiter;") + self.push_code(f" this->external_response_ = nullptr;") + self.push_code( + f' this->sendHttpRequest("{cluster_name}", {url.name}, *this);' + ) + self.push_code(f" assert(this->appnet_coroutine_.has_value());") + self.push_code( + f' ENVOY_LOG(info, "[AppNet Filter] Blocking HTTP Request Sent");' + ) + + if self.global_state_lock_held: + # we need to release the lock. Otherwise, we will have a deadlock. + self.push_code(f" lock.unlock();") + self.push_code(f" co_await http_awaiter;") + if self.global_state_lock_held: + # we need to re-acquire the lock. + self.push_code(f" lock.lock();") + + self.push_code( + f' ENVOY_LOG(info, "[AppNet Filter] Blocking HTTP Request Done");' + ) + self.push_code("}") + + def genBlockingWebdisRequest(self, url: NativeVariable) -> None: + self.genBlockingHttpRequest("webdis_cluster", url) + + def genNonBlockingWebdisRequest(self, url: NativeVariable) -> None: + assert url.type.is_string() + assert self.current_procedure in ["req", "resp", "init"] + + self.push_code("{") + self.push_code( + f' ENVOY_LOG(info, "[AppNet Filter] Non-Blocking Webdis Request");' + ) + # make sure url start with SET + self.push_code(f" this->sendWebdisRequest({url.name}, *empty_callback_);") + self.push_code("}") + + def get_current_msg_fields(self) -> dict[str, str]: + assert self.current_procedure in ["req", "resp"] + if self.current_procedure == "req": + return self.message_field_types["request"] + else: + return self.message_field_types["response"] + + def get_callback_name(self): + return ( + "this->decoder_callbacks_" + if self.current_procedure == "req" + else "this->encoder_callbacks_" + ) + + +class eBPFGenerator(Visitor): + def __init__(self, placement: str) -> None: + self.placement = placement + if placement not in ["client", "server", "ambient"]: + raise ValueError(f"invalid placement {placement}") + + def visitNode(self, node: Node, ctx: eBPFContext): + return node.__class__.__name__ + + def visitProgram(self, node: Program, ctx: eBPFContext) -> None: + print("Enter visitProgram") + node.definition.accept(self, ctx) + node.init.accept(self, ctx) + node.req.accept(self, ctx) + node.resp.accept(self, ctx) + print("Exit visitProgram") + + def visitState(self, node: State, ctx: eBPFContext) -> None: + print("Enter visitState") + print(f"node = {node}") + # Iterate through all state variables and declare them + for (identifier, type, cons, comb, per) in node.state: + assert per.name == "None" or per.name == "true" or per.name == "false" + appType: AppNetType = type.accept(self, ctx) + decorator = { + "consistency": cons.name, + "combiner": comb.name, + "persistence": per.name, + } + state = ctx.declareAppNetState(identifier.name, appType, decorator) + native_var, decl = ctx.declareeBPFVar( + identifier.name, state.type.to_native(), False + ) + print(f"decl = {decl}") + state.native_var = native_var + ctx.push_global_var_def(decl) + + # ===== Consistency Part ===== + assert decorator["consistency"] in ["strong", "weak", "None"] + if decorator["consistency"] == "None": + continue + + if appType.is_map() == False: + raise Exception( + "Only map type can have consistency for now" + ) + + assert isinstance(appType, AppNetMap) + if appType.key.is_string() == False or ( + appType.value.is_string() == False and appType.value.is_int() == False + ): + raise Exception( + "Only map can have consistency for now" + ) + + # if decorator["consistency"] in ["weak"]: + # if ( + # appType.key.is_string() == False + # or appType.value.is_string() == False + # ): + # raise Exception( + # "Only map can have weak consistency for now" + # ) + # # for (auto& [key, value] : cache) { + # # ENVOY_LOG(info, "[AppNet Filter] cache key={}, value={}", key, value); + # # } + # # this->sendWebdisRequest(const std::string path, int &callback) + # # path="/MGET/a/b/c/d" to get a,b,c,d + # # path="/MSET/a/b/c/d" to set a to b, c to d + + # ctx.on_tick_code.append("{") + # # We simulate the overhead. Just set the value to the key, and then get them back from webdis. + # ctx.on_tick_code.append(f' std::string geturl = "/MGET";') + # ctx.on_tick_code.append(f' std::string seturl = "/MSET";') + # ctx.on_tick_code.append(f" for (auto& [key, value] : {state.name}) {{") + # ctx.on_tick_code.append(f' geturl += "/" + key;') + # ctx.on_tick_code.append( + # f' seturl += "/" + key + "/" + base64_encode(value, true);' + # ) + # ctx.on_tick_code.append(f" }}") + # ctx.on_tick_code.append(f"this->sendWebdisRequest(seturl);") + # # TODO: We should wait for the response of the set request, and then we can send the get request. + # ctx.on_tick_code.append(f" this->sendWebdisRequest(geturl);") + # ctx.on_tick_code.append("}") + print("Exit visitState") + + def visitProcedure(self, node: Procedure, ctx: eBPFContext): + print("Enter visitProcedure") + ctx.current_procedure = node.name + ctx.current_procedure_code = [] + # self.appnet_var = [{'prob': }, {}] + ctx.push_appnet_scope() + # self.native_var = [{'prob': }, {}] + ctx.push_native_scope() + + ctx.global_state_lock_held = False + # if len(ctx.appnet_var[0]) > 0: + # # TODO: We do very coarse-grained locking here. + # # If we have global states, we serialize all the request handling. + # if ctx.current_procedure != "init": + # # init() already has the lock in tempalte, for init global variable. + # ctx.push_code("std::unique_lock lock(global_state_lock);") + # ctx.global_state_lock_held = True + + if node.name == "init": + assert len(node.params) == 0 + else: + assert node.name == "req" or node.name == "resp" + # assert(len(node.params) == 1) + assert node.params[0].name == "rpc" + app_rpc = ctx.declareAppNetLocalVariable("rpc", AppNetRPC()) + native_rpc, decl = ctx.declareeBPFVar("rpc", app_rpc.type.to_native()) + app_rpc.native_var = native_rpc + buffer_name = ( + "this->request_buffer_" + if node.name == "req" + else "this->response_buffer_" + ) + # tmp_data_buf_name = ctx.new_temporary_name() + # ctx.push_code(decl) + # ctx.push_code( + # f"std::vector {tmp_data_buf_name}({buffer_name}->length());" + # ) + # ctx.push_code( + # f"{buffer_name}->copyOut(0, {buffer_name}->length(), {tmp_data_buf_name}.data());" + # ) + # ctx.push_code( + # f"{native_rpc.name}.ParseFromArray({tmp_data_buf_name}.data() + 5, {tmp_data_buf_name}.size() - 5);" + # ) + + for stmt in node.body: + stmt.accept(self, ctx) + + if node.name == "init": + ctx.init_code = ctx.current_procedure_code + elif node.name == "req": + ctx.req_body_code = ctx.current_procedure_code + elif node.name == "resp": + ctx.resp_body_code = ctx.current_procedure_code + else: + raise Exception("unknown procedure") + + ctx.pop_appnet_scope() + ctx.pop_native_scope() + + assert len(ctx.native_var) == 1 + print("Exit visitProcedure") + + def visitForeach(self, node: Foreach, ctx: eBPFContext): + vec_name = node.var.name + lambda_arg_name = node.func.arg.name + + ctx.push_appnet_scope() + + vec, is_state = ctx.get_appnet_var(vec_name) + vec_type = vec.type + assert isinstance(vec_type, AppNetVec) + inner_type = vec_type.type + + ctx.push_code( + f"for ({inner_type.to_native().type_name()} {lambda_arg_name} : {vec_name}) {{" + ) + iter_var_appnet = ctx.declareAppNetLocalVariable(lambda_arg_name, inner_type) + iter_var_native, _decl = ctx.declareeBPFVar( + lambda_arg_name, inner_type.to_native() + ) + iter_var_appnet.native_var = iter_var_native + + for stmt in node.func.body: + stmt.accept(self, ctx) + + ctx.pop_appnet_scope() + + ctx.push_code("}") + + def visitStatement(self, node: Statement, ctx: eBPFContext): + # if ctx.envoy_verbose: + # try: + # LOG.info(f"statement {node.accept(Printer(), ctx=None)}") + # except Exception as e: + # LOG.info(f"statement {node}, fail to parse") + + # A statement may be translated into multiple C++ statements. + # These statements will be attached to ctx.current_procedure_code directly. + print("Enter visitStatement") + if isinstance(node.stmt, Foreach): + return self.visitForeach(node.stmt, ctx) + pass + elif node.stmt is None: + ctx.push_code("; // empty statement") + elif isinstance(node.stmt, Send): + # TODO: deal with Up and Down + # TODO: support hooks in addition to the XDP hook + if isinstance(node.stmt.msg, Error): + ctx.push_code(f"bpf_trace_printk(\"PKT DROP decision\\\\n\");") + ctx.push_code(f"return XDP_DROP;") + else: + assert isinstance(node.stmt.msg, Identifier), "currently do not support types beyond Error and Identifier" + ctx.push_code(f"return XDP_PASS;") + elif isinstance(node.stmt, Assign): + if ctx.current_procedure != "init": + ctx.push_code(f"// stmt {node.stmt}") + ctx.print_content() + retval = node.stmt.accept(self, ctx) + ctx.print_content() + if not isinstance(retval, list): + retval = [retval] + print("Exit visitStatement") + return retval + elif (isinstance(node.stmt, Match) + or isinstance(node.stmt, Expr) + ): + ctx.push_code(f"// stmt {node.stmt}") + ctx.print_content() + retval = node.stmt.accept(self, ctx) + print("After") + ctx.print_content() + # exit(0) + if not isinstance(retval, list): + retval = [retval] + print("Exit visitStatement") + return retval + + else: + raise Exception("unknown statement") + + def visitIdentifier(self, node: Identifier, ctx) -> str: + return node.name + + def generateOptionMatch( + self, + node: Match, + ctx: eBPFContext, + appnet_type: AppNetOption, + native_expr: NativeVariable, + ) -> None: + + some_pattern = None + some_pattern_stmts = None + + none_pattern = None + none_pattern_stmts = None + + assert len(node.actions) == 2 # Option type must have two patterns + + for pattern, stmts in node.actions: + if pattern.some == True: + some_pattern = pattern + some_pattern_stmts = stmts + else: + none_pattern = pattern + none_pattern_stmts = stmts + + if some_pattern is None or none_pattern is None: + raise Exception("Some and None patterns must be both present") + + # Generate arms. + + assert some_pattern is not None + assert none_pattern is not None + assert some_pattern_stmts is not None + assert none_pattern_stmts is not None + + assert isinstance(none_pattern.value, Literal) + none_appnet_type, none_embed_str = none_pattern.value.accept(self, ctx) + assert none_embed_str == "None" + + ctx.push_code(f"if ({native_expr.name}.has_value())") + ctx.push_appnet_scope() + ctx.push_code("{") + + assert isinstance(some_pattern.value, Identifier) + bind_name: str = some_pattern.value.accept(self, ctx) + bind_app_var = ctx.declareAppNetLocalVariable(bind_name, appnet_type.inner) + bind_native_var, decl = ctx.declareeBPFVar( + bind_name, appnet_type.inner.to_native() + ) + bind_app_var.native_var = bind_native_var + ctx.push_code(decl) + + ctx.push_code(f"{bind_name} = {native_expr.name}.value();") + for stmt in some_pattern_stmts: + stmt.accept(self, ctx) + ctx.push_code("}") + ctx.pop_appnet_scope() + + ctx.push_code("else") + ctx.push_appnet_scope() + ctx.push_code("{") + + for stmt in none_pattern_stmts: + stmt.accept(self, ctx) + + ctx.pop_appnet_scope() + ctx.push_code("}") + + def visitMatch(self, node: Match, ctx: eBPFContext) -> None: + print("Enter visitMatch") + print(f"type(node.expr) = {type(node.expr)}, node.actions = {node.actions}") + # print(f"node.expr.lhs = {node.expr.lhs}, node.expr.op = {node.expr.op}, node.expr.rhs = {node.expr.rhs}") + appnet_type, native_expr = self.visitGeneralExpr(node.expr, ctx) + print(f"appnet_type = {appnet_type}, native_expr = {native_expr}") + if isinstance(native_expr.type, NativeOption): + assert isinstance(appnet_type, AppNetOption) + self.generateOptionMatch(node, ctx, appnet_type, native_expr) + return + # ====== Generate basic types match (no binding) ==== + # exit(0) + first = True + + empty_pattern = None + empty_pattern_stmts = None + + for pattern, stmts in node.actions: + assert pattern.some == False + if isinstance(pattern.value, Identifier): + raise Exception("Only Some(x) binding is supported") + else: + assert isinstance(pattern.value, Literal) + pattern_appnet_type, pattern_embed_str = pattern.value.accept(self, ctx) + print(f"pattern_appnet_type={pattern_appnet_type}, pattern_embed_str={pattern_embed_str}") + if pattern_embed_str == "_": + if empty_pattern is not None: + raise Exception( + "Only one empty pattern is allowed in a match statement" + ) + empty_pattern = pattern + empty_pattern_stmts = stmts + continue + if first == False: + ctx.push_code("else") + ctx.push_code(f"if ({native_expr.name} == {pattern_embed_str})") + ctx.push_appnet_scope() + ctx.push_code("{") + for stmt in stmts: + stmt.accept(self, ctx) + ctx.push_code("}") + ctx.pop_appnet_scope() + first = False + if empty_pattern is not None: + assert empty_pattern_stmts is not None + if first == True: + raise Exception("Remove redundant empty pattern please") + ctx.push_code("else") + ctx.push_appnet_scope() + ctx.push_code("{") + for stmt in empty_pattern_stmts: + stmt.accept(self, ctx) + ctx.push_code("}") + ctx.pop_appnet_scope() + + ctx.pop_appnet_scope() + + def visitGeneralExpr( + self, node, ctx: eBPFContext + ) -> Tuple[AppNetType, NativeVariable]: + print("Enter visitGeneralExpr") + if isinstance(node, Literal): + rhs_appnet_type, embed_str = node.accept(self, ctx) + print(f"type(rhs_appnet_type) = {type(rhs_appnet_type)}, rhs_appnet_type = {rhs_appnet_type}, type(embed_str) = {type(embed_str)}, embed_str = {embed_str}") + rhs_native_var, decl = ctx.declareeBPFVar( + ctx.new_temporary_name(), rhs_appnet_type.to_native() + ) + # Create a temporary variable to store the value of the literal + ctx.push_code(decl) + ctx.push_code(f"{rhs_native_var.name} = {embed_str};") + + elif isinstance(node, Identifier): + rhs = None + res = ctx.find_appnet_var(node.name) + + if res is not None: + rhs, _is_state = res + elif node.name in ctx.appnet_var[0]: + rhs = ctx.appnet_var[0][node.name] + elif node.name == "inf": + # Special case. + rhs = ctx.declareAppNetLocalVariable(node.name, AppNetInt()) + rhs_native_var, decl = ctx.declareeBPFVar( + ctx.new_temporary_name(), NativeInt() + ) + ctx.push_code(decl) + # set it to the maximum value of int + ctx.push_code( + f"{rhs_native_var.name} = std::numeric_limits::max();" + ) + rhs.native_var = rhs_native_var + elif node.name == "inf_f": + rhs = ctx.declareAppNetLocalVariable(node.name, AppNetFloat()) + rhs_native_var, decl = ctx.declareeBPFVar( + ctx.new_temporary_name(), NativeFloat() + ) + ctx.push_code(decl) + # set it to the maximum value of float + ctx.push_code( + f"{rhs_native_var.name} = std::numeric_limits::max();" + ) + rhs.native_var = rhs_native_var + else: + print(f"node.name = {node.name}") + raise Exception(f"unknown variable {node.name}") + + assert rhs is not None + assert rhs.native_var is not None + rhs_native_var = rhs.native_var + rhs_appnet_type = rhs.type + + elif isinstance(node, Expr): # Maybe subclass of Expr + # print(f"node.lhs = {node.lhs}, node.op = {node.op}, node.rhs = {node.rhs}") + print("isinstance(node, Expr)") + if isinstance(node, FuncCall): + print(f"node.name = {node.name}, node.args = {node.args}") + print(node.__str__()) + rhs_appnet_type, rhs_native_var = node.accept(self, ctx) + + else: + raise Exception(f"unknown right hand side type={node.__class__.__name__}") + + assert isinstance(rhs_appnet_type, AppNetType) + assert isinstance(rhs_native_var, NativeVariable) + print("Exit visitGeneralExpr") + return (rhs_appnet_type, rhs_native_var) + + def visitAssign(self, node: Assign, ctx: eBPFContext) -> None: + print("Enter visitAssign") + assert isinstance(node.left, Identifier) or isinstance(node.left, Pair) + if ctx.current_procedure == "init": + # b = BPF(text=bpf_code) + # prob = b["prob"] + # prob[ctypes.c_uint(0)] = ctypes.c_uint(50) + # if ctx.current_procedure == "init": + # if ctx.find_appnet_var(lhs_name): + # ctx.push_code(f"{lhs_name}[ctypes.c_uint(0)] = {rhs_native_var.name};") + # print(f"{lhs_name} = {rhs_native_var.name};") + # exit(0) + if isinstance(node.left, Identifier) and isinstance(node.right, Literal): + lhs_name = node.left.name + rhs_appnet_type, rhs_native_var = node.right.accept(self, ctx) + if ctx.find_appnet_var(lhs_name): + ctx.push_code(f"{lhs_name} = b[\"{lhs_name}\"]") + ctx.push_code(f"{lhs_name}[ctypes.c_uint(0)] = ctypes.c_uint({rhs_native_var})") + else: + print(f"node.left = {node.left}, node.right = {node.right}") + assert False, "New init case" + else: + if isinstance(node.left, Identifier): + print("isinstance(node.left, Identifier)") + lhs_name = node.left.name + rhs_appnet_type, rhs_native_var = self.visitGeneralExpr(node.right, ctx) + assert isinstance(rhs_appnet_type, AppNetType) + assert isinstance(rhs_native_var, NativeVariable) + print("lhs_name = ", lhs_name) + if ctx.find_appnet_var(lhs_name): + res = ctx.find_appnet_var(f"{lhs_name}_key") + print(f"lhs_name = {lhs_name}, res = {res}") + if res is None: + print("res is None") + rhs = ctx.declareAppNetLocalVariable(f"{lhs_name}_key", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"{lhs_name}_key", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + print("res is not None") + # ctx.push_code(f"u32 {lhs_name}_key = 0;") + res = ctx.find_appnet_var(f"*{lhs_name}_val") + if res is None: + print("res is None") + rhs = ctx.declareAppNetLocalVariable(f"*{lhs_name}_val", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"*{lhs_name}_val", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + # TODO: should avoid writing sth. like *{lhs_name}_val = {rhs_native_var.name}; + # ctx.push_code(f"{lhs_name}_val = {lhs_name}.lookup(&{lhs_name}_key);") + # ctx.push_code(f"if ({lhs_name}_val) {{") + # ctx.push_code(f" *{lhs_name}_val = {rhs_native_var.name};") + # ctx.push_code(f"}}") + ctx.push_code(f"*{lhs_name}_val = {rhs_native_var.name};") + ctx.push_code(f"{lhs_name}.update(&{lhs_name}_key, {lhs_name}_val);") + + else: + rhs = ctx.declareAppNetLocalVariable(f"{lhs_name}", rhs_appnet_type) + eBPF_rhs, decl = ctx.declareeBPFVar(f"{lhs_name}", rhs.type.to_native()) + # print(f"dddddddddd {lhs_name} = {rhs_native_var.name};") + print(f"dddddddd decl = {decl}, {lhs_name} = {rhs_native_var.name};") + ctx.push_code(decl) + ctx.push_code(f"{lhs_name} = {rhs_native_var.name};") + # if ctx.find_appnet_var(lhs_name) is None: + # # This is a new local variable. + # LOG.debug(f"new local variable {lhs_name}") + + # # Eval the right side. + # rhs_appnet_type, rhs_native_var = self.visitGeneralExpr(node.right, ctx) + # assert isinstance(rhs_appnet_type, AppNetType) + # assert isinstance(rhs_native_var, NativeVariable) + + # lhs = ctx.declareAppNetLocalVariable(lhs_name, rhs_appnet_type) + # lhs_native, decl = ctx.declareeBPFVar( + # lhs_name, rhs_appnet_type.to_native() + # ) + # lhs.native_var = lhs_native + # ctx.push_code( + # f"{lhs.native_var.type.type_name()} {lhs.name} = {rhs_native_var.name};" + # ) + + # else: + # # Assigning the existing variable. + + # lhs = ctx.get_appnet_var(lhs_name)[0] + + # ctx.most_recent_assign_left_type = lhs.type + # rhs_appnet_type, rhs_native_var = self.visitGeneralExpr(node.right, ctx) + # ctx.most_recent_assign_left_type = None + + # assert isinstance(rhs_appnet_type, AppNetType) + # assert isinstance(rhs_native_var, NativeVariable) + # assert lhs.native_var is not None + # assert lhs.native_var.type.is_same(rhs_native_var.type) + + # ctx.push_code(f"{lhs.name} = {rhs_native_var.name};") + + elif isinstance(node.left, Pair): + + rhs_appnet_type, rhs_native_var = self.visitGeneralExpr(node.right, ctx) + + assert isinstance(rhs_appnet_type, AppNetType) + assert isinstance(rhs_native_var, NativeVariable) + + assert isinstance(node.left.first, Identifier) and isinstance( + node.left.second, Identifier + ) + assert isinstance(rhs_appnet_type, AppNetPair) + first_name = node.left.first.name + second_name = node.left.second.name + + # In pair assignment, we only support declaring new local variables. + assert ctx.find_appnet_var(first_name) is None + assert ctx.find_appnet_var(second_name) is None + + first_var_appnet = ctx.declareAppNetLocalVariable( + first_name, rhs_appnet_type.first + ) + second_var_appnet = ctx.declareAppNetLocalVariable( + second_name, rhs_appnet_type.second + ) + + first_var_native, first_decl = ctx.declareeBPFVar( + first_name, rhs_appnet_type.first.to_native() + ) + second_var_native, second_decl = ctx.declareeBPFVar( + second_name, rhs_appnet_type.second.to_native() + ) + + first_var_appnet.native_var = first_var_native + second_var_appnet.native_var = second_var_native + + ctx.push_code(first_decl) + ctx.push_code(second_decl) + + ctx.push_code(f"{first_var_native.name} = {rhs_native_var.name}.first;") + ctx.push_code(f"{second_var_native.name} = {rhs_native_var.name}.second;") + + pass + else: + raise Exception("should not reach here") + print("Exit visitAssign") + + def acceptable_oper_type( + self, lhs: AppNetType, op: Operator, rhs: AppNetType + ) -> bool: + if op in [ + Operator.ADD, + Operator.SUB, + Operator.MUL, + Operator.DIV, + Operator.GT, + Operator.LT, + Operator.GE, + Operator.LE, + ]: + if lhs.is_arithmetic() and rhs.is_arithmetic(): + return True + return False + + def visitPair( + self, node: Pair, ctx: eBPFContext + ) -> Tuple[AppNetType, NativeVariable]: + assert isinstance(node, Pair) + + first_appnet_type, first_native_var = self.visitGeneralExpr(node.first, ctx) + second_appnet_type, second_native_var = self.visitGeneralExpr(node.second, ctx) + + assert isinstance(first_appnet_type, AppNetType) + assert isinstance(second_appnet_type, AppNetType) + + new_pair_type = AppNetPair(first_appnet_type, second_appnet_type) + appnet_pair = ctx.declareAppNetLocalVariable( + ctx.new_temporary_name(), new_pair_type + ) + native_pair, decl = ctx.declareeBPFVar( + appnet_pair.name, new_pair_type.to_native() + ) + ctx.push_code(decl) + ctx.push_code(f"{native_pair.name}.first = {first_native_var.name};") + ctx.push_code(f"{native_pair.name}.second = {second_native_var.name};") + + return (new_pair_type, native_pair) + + # A temporary native variable will be generated to store the result of the expression. + def visitExpr( + self, node: Expr, ctx: eBPFContext + ) -> Tuple[AppNetType, NativeVariable]: + if isinstance(node, Pair): + return self.visitPair(node, ctx) + + assert isinstance(node, Expr) + + lhs_appnet_type, lhs_nativevar = self.visitGeneralExpr(node.lhs, ctx) + rhs_appnet_type, rhs_nativevar = self.visitGeneralExpr(node.rhs, ctx) + + assert isinstance(lhs_appnet_type, AppNetType) + assert isinstance(rhs_appnet_type, AppNetType) + assert isinstance(lhs_nativevar, NativeVariable) + assert isinstance(rhs_nativevar, NativeVariable) + + # Make sure they are the same type. We don't support type conversion for now. + + assert lhs_appnet_type.is_same(rhs_appnet_type) or self.acceptable_oper_type( + lhs_appnet_type, node.op, rhs_appnet_type + ) + # assert(lhs_nativevar.type.is_same(rhs_nativevar.type)) + + def get_expr_type( + op: Operator, lhs_type: AppNetType, rhs_type: AppNetType + ) -> AppNetType: + if lhs_type.is_basic() and rhs_type.is_basic(): + if op in [Operator.ADD, Operator.SUB, Operator.MUL, Operator.DIV]: + assert lhs_type.is_arithmetic() and rhs_type.is_arithmetic() + return ( + AppNetFloat() + if lhs_type.is_float() or rhs_type.is_float() + else AppNetInt() + ) + + if op in [ + Operator.EQ, + Operator.NEQ, + Operator.LT, + Operator.GT, + Operator.LE, + Operator.GE, + ]: + assert ( + lhs_type.is_same(rhs_type) + or lhs_type.is_arithmetic() + and rhs_type.is_arithmetic() + ) + if lhs_type.is_bool(): + assert op in [Operator.EQ, Operator.NEQ] + return AppNetBool() + else: + # String == and != are supported + if op in [Operator.EQ, Operator.NEQ]: + assert lhs_type.is_string() and rhs_type.is_string() + return AppNetBool() + else: + raise Exception("unsupported operator") + raise Exception("unknown operator") + + expr_appnet_type = get_expr_type(node.op, lhs_appnet_type, rhs_appnet_type) + + new_var, decl = ctx.declareeBPFVar( + ctx.new_temporary_name(), expr_appnet_type.to_native() + ) + ctx.push_code(decl) + # add appnet variable + if ctx.find_appnet_var(lhs_nativevar.name) and ctx.find_appnet_var(rhs_nativevar.name): + res = ctx.find_appnet_var(f"{lhs_nativevar.name}_key") + if res is None: + rhs = ctx.declareAppNetLocalVariable(f"{lhs_nativevar.name}_key", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"{lhs_nativevar.name}_key", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + # add pointer + res = ctx.find_appnet_var(f"*{lhs_nativevar.name}_val") + if res is None: + print("res is None") + rhs = ctx.declareAppNetLocalVariable(f"*{lhs_nativevar.name}_val", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"*{lhs_nativevar.name}_val", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + res = ctx.find_appnet_var(f"{rhs_nativevar.name}_key") + if res is None: + rhs = ctx.declareAppNetLocalVariable(f"{rhs_nativevar.name}_key", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"{rhs_nativevar.name}_key", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + res = ctx.find_appnet_var(f"*{rhs_nativevar.name}_val") + if res is None: + rhs = ctx.declareAppNetLocalVariable(f"*{rhs_nativevar.name}_val", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"*{rhs_nativevar.name}_val", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + assign_code = f''' +{lhs_nativevar.name}_val = {lhs_nativevar.name}.lookup(&{lhs_nativevar.name}_key); +{rhs_nativevar.name}_val = {rhs_nativevar.name}.lookup(&{rhs_nativevar.name}_key); +if ({lhs_nativevar.name}_val && {rhs_nativevar.name}_val) {{ + {new_var.name} = (*{lhs_nativevar.name}_val) {node.op.accept(self, ctx)} (*{rhs_nativevar.name}_val); +}}''' + elif ctx.find_appnet_var(lhs_nativevar.name): + res = ctx.find_appnet_var(f"{lhs_nativevar.name}_key") + if res is None: + rhs = ctx.declareAppNetLocalVariable(f"{lhs_nativevar.name}_key", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"{lhs_nativevar.name}_key", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + res = ctx.find_appnet_var(f"*{lhs_nativevar.name}_val") + if res is None: + print("res is None") + rhs = ctx.declareAppNetLocalVariable(f"*{lhs_nativevar.name}_val", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"*{lhs_nativevar.name}_val", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + assign_code = f''' +{lhs_nativevar.name}_val = {lhs_nativevar.name}.lookup(&{lhs_nativevar.name}_key); +if ({lhs_nativevar.name}_val) {{ + {new_var.name} = (*{lhs_nativevar.name}_val) {node.op.accept(self, ctx)} {rhs_nativevar.name}; +}}''' + elif ctx.find_appnet_var(rhs_nativevar.name): + res = ctx.find_appnet_var(f"{rhs_nativevar.name}_key") + if res is None: + rhs = ctx.declareAppNetLocalVariable(f"{rhs_nativevar.name}_key", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"{rhs_nativevar.name}_key", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + res = ctx.find_appnet_var(f"*{rhs_nativevar.name}_val") + if res is None: + rhs = ctx.declareAppNetLocalVariable(f"*{rhs_nativevar.name}_val", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"*{rhs_nativevar.name}_val", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + assign_code = f''' +{rhs_nativevar.name}_val = {rhs_nativevar.name}.lookup(&{rhs_nativevar.name}_key); +if ({rhs_nativevar.name}_val) {{ + {new_var.name} = {lhs_nativevar.name} {node.op.accept(self, ctx)} (*{rhs_nativevar.name}_val); +}}''' + else: + assign_code = f"{new_var.name} = {lhs_nativevar.name} {node.op.accept(self, ctx)} {rhs_nativevar.name};" + # LEFT_NAME = "" + # if ctx.find_appnet_var(lhs_nativevar.name): + # # TODO: consider various members in the array. Currently, we let index to be always 0 + # # u32 prob_val_key = 0; + # # u32 *prob_val = prob.lookup(&prob_val_key); + + # LEFT_NAME = lhs_nativevar.name + "[0]" + # else: + # LEFT_NAME = lhs_nativevar.name + # RIGHT_NAME = "" + # if ctx.find_appnet_var(rhs_nativevar.name): + # RIGHT_NAME = rhs_nativevar.name + "[0]" + # else: + # RIGHT_NAME = rhs_nativevar.name + # print(f"LEFT_NAME = {LEFT_NAME}, RIGHT_NAME = {RIGHT_NAME}") + # assign_code = f"{new_var.name} = {LEFT_NAME} {node.op.accept(self, ctx)} {RIGHT_NAME};" + ctx.push_code(assign_code) + + return (expr_appnet_type, new_var) + + def visitOperator(self, node: Operator, ctx: eBPFContext) -> str: + print("Enter visitOperator") + if node == Operator.ADD: + return "+" + elif node == Operator.SUB: + return "-" + elif node == Operator.MUL: + return "*" + elif node == Operator.DIV: + return "/" + elif node == Operator.EQ: + return "==" + elif node == Operator.NEQ: + return "!=" + elif node == Operator.LT: + print("Exit visitOperator") + return "<" + elif node == Operator.GT: + return ">" + elif node == Operator.LE: + return "<=" + elif node == Operator.GE: + return ">=" + else: + raise Exception("unknown operator") + + def visitType(self, node: Type, ctx: eBPFContext) -> AppNetType: + if node.name == "int": + return AppNetInt() + elif node.name == "uint": + return AppNetUInt() + elif node.name == "float": + return AppNetFloat() + elif node.name == "string": + return AppNetString() + elif node.name == "bool": + return AppNetBool() + elif node.name == "Instant": + return AppNetInstant() + elif node.name.startswith("Map"): + # For now, we only support 3 types of map: + # Map + # Map> + # Map> + print(node.name) + if node.name.count(",") == 1: + # Map / Map> + keytype_str = node.name[4:-1].split(",")[0].strip() + valuetype_str = node.name[4:-1].split(",")[1].strip() + key_type = appnet_type_from_str(keytype_str) + value_type = appnet_type_from_str(valuetype_str) + return AppNetMap(key_type, value_type) + else: + # Map> + # keytype + keytype_str = node.name[4:-1].strip()[ + node.name[4:-1].find("<") + 1 : node.name[4:-1].rfind(",") + ] + # + valuepair_str = node.name[4:-1][ + node.name[4:-1].find("<") : node.name[4:-1].rfind(">") + 1 + ] + + key_type = appnet_type_from_str(keytype_str) + value_type = appnet_type_from_str(valuepair_str) + return AppNetMap(key_type, value_type) + elif node.name.startswith("Vec"): + # Vec + valuetype_str = node.name[4:-1].strip() + value_type = appnet_type_from_str(valuetype_str) + return AppNetVec(value_type) + else: + print(node.name) + raise Exception("unknown type") + + def genGeneralFuncCall( + self, + fname: str, + node: MethodCall, + args: List[Tuple[AppNetType, NativeVariable]], + ctx: eBPFContext, + ) -> Tuple[AppNetType, NativeVariable]: + for func in APPNET_BUILTIN_FUNCS: + func_instance: AppNetBuiltinFuncProto = func() + if func_instance.appnet_name != fname: + continue + + if fname == "metaget": + for arg in node.args: + if isinstance(arg, Literal) and "meta" in arg.value: + func_instance.set_field_name(arg.value) + + # Check if the arguments match + if not func_instance.instantiate([arg[0] for arg in args]): + continue + + # Generate the code + ret_native_var = func_instance.gen_code(ctx, *[arg[1] for arg in args]) + return (func_instance.ret_type(), ret_native_var) + + raise Exception( + f"unknown function {fname}. Parameters matching failed: {[arg[0] for arg in args]}" + ) + + def visitFuncCall( + self, node: FuncCall, ctx: eBPFContext + ) -> Tuple[AppNetType, Optional[NativeVariable]]: + args = [self.visitGeneralExpr(arg, ctx) for arg in node.args] + fname = node.name.name + return self.genGeneralFuncCall(fname, node, args, ctx) + + def visitMethodCall( + self, node: MethodCall, ctx: eBPFContext + ) -> Tuple[AppNetType, Optional[NativeVariable]]: + + fname = node.method.name.lower() + args = [self.visitGeneralExpr(arg, ctx) for arg in node.args] + + obj_name = node.obj.name + obj_appnet_var, is_global_state = ctx.get_appnet_var(obj_name) + + assert obj_appnet_var.native_var is not None + return self.genGeneralFuncCall( + fname, node, [(obj_appnet_var.type, obj_appnet_var.native_var)] + args, ctx + ) + + def visitSend(self, node: Send, ctx: eBPFContext): + # Down: cluster side + # Up: client side + # Request: Up --> Down + # Response: Down --> Up + return + match ctx.current_procedure: + case "req": + if node.direction == "Up": + # Make sure it's something like send(err('msg'), Up) + if isinstance(node.msg, Error): + assert node.msg.msg.type == DataType.STR + assert node.msg.msg.value != "" + # Forbidden 403 + + ctx.push_code( + "std::function modify_headers = [](ResponseHeaderMap& headers) {" + ) + ctx.push_code( + ' headers.addCopy(LowerCaseString("grpc-status"), "1");' + ) + ctx.push_code("};") + + ctx.push_code(f"this->req_appnet_blocked_ = true;") + ctx.push_code( + f'this->decoder_callbacks_->sendLocalReply(Http::Code::Forbidden, "{node.msg.msg.value[1:-1]}", modify_headers, absl::nullopt, "");' + ) + ctx.push_code("co_return;") + else: + raise Exception( + "req procedure should only send error message tp Up direction" + ) + elif node.direction == "Down": + ctx.push_code("if (this->in_decoding_or_encoding_ == false) {") + ctx.push_code(f" this->decoder_callbacks_->continueDecoding();") + ctx.push_code("}") + ctx.push_code("co_return;") + else: + raise Exception("unknown direction") + + case "resp": + if node.direction == "Up": + ctx.push_code("if (this->in_decoding_or_encoding_ == false) {") + ctx.push_code(f" this->encoder_callbacks_->continueEncoding();") + ctx.push_code("}") + ctx.push_code("co_return;") + elif node.direction == "Down": + raise NotImplementedError( + "down direction is not supported in resp procedure yet" + ) + else: + raise Exception("unknown direction") + case _: + raise Exception("unknown procedure") + + def visitLiteral(self, node: Literal, ctx: eBPFContext) -> Tuple[AppNetType, str]: + # Return a string that can be embedded in the C++ code directly. + # A literal is a string, int, float, or bool + try: + print("Enter visitLiteral") + print("node.type =", node.type, "node.value =", node.value, "type(node.value) =", type(node.value)) + if node.type == DataType.STR: + # replace ' into " + new_str = node.value.replace("'", '"') + return (AppNetString(node.value[1:-1]), new_str) + elif node.type == DataType.INT: + return (AppNetInt(), str(node.value)) + elif node.type == DataType.FLOAT: + print(f"Come here, str(node.value * 100) = {str(int(float(node.value) * 100))}") + return (AppNetInt(), str(int(float(node.value) * 100))) + # return (AppNetFloat(), str(node.value)) + elif node.type == DataType.BOOL: + return (AppNetBool(), str(node.value).lower()) + else: + types = [ + (int, AppNetInt()), + (float, AppNetFloat()), + (str, AppNetString()), + (bool, AppNetBool()), + ] + for t, appnet_type in types: + # try cast + try: + t(node.value) + LOG.warning(f"cast {node.value} into a {t}") + if appnet_type == AppNetFloat(): + print("return (AppNetInt(), str(node.value * 100))") + return (AppNetInt(), str(node.value * 100)) + return (appnet_type, str(node.value)) + except: + pass + + raise Exception("unknown literal type, and cast failed") + finally: + print("Exit visitLiteral") + + def visitError(self, node: Error, ctx) -> str: + raise NotImplementedError + + +# ======================== BUILD-IN FUNCTIONS ======================== + + +class AppNetBuiltinFuncProto: + def instantiate(self, args: List[AppNetType]) -> bool: + # This function accepts a list of AppNetType and returns a boolean indicating whether the given list of AppNetType is valid for this function + raise NotImplementedError + + def ret_type(self) -> AppNetType: + raise NotImplementedError + + def native_arg_sanity_check(self, native_args: List[NativeVariable]): + # Check the given native arguments is consistent with the app args given by instantiate() + assert self.prepared + assert len(native_args) == len(self.appargs) + for i in range(len(native_args)): + assert native_args[i].type.is_same(self.appargs[i].to_native()) + + def gen_code(self, ctx: eBPFContext, *args) -> NativeVariable: + raise NotImplementedError + + def __init__(self, appnet_name: str, comments: str = ""): + self.appnet_name = appnet_name + self.comments = comments + self.prepared = False + self.appargs = [] + + +class GetMapStrongConsistency(AppNetBuiltinFuncProto): + def instantiate(self, args: List[AppNetType]) -> bool: + ret = ( + len(args) == 2 + and isinstance(args[0], AppNetMap) + and args[0].key.is_same(args[1]) + ) + ret = ret and args[0].decorator["consistency"] == "strong" + if ret: + assert isinstance(args[0], AppNetMap) + assert isinstance(args[1], AppNetType) + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + assert self.prepared + assert isinstance(self.appargs[0], AppNetMap) + return AppNetOption(self.appargs[0].value) + + def gen_code( + self, ctx: eBPFContext, map: NativeVariable, key: NativeVariable + ) -> NativeVariable: + self.native_arg_sanity_check([map, key]) + + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + ctx.push_code(native_decl_stmt) + + ctx.push_code("{") + ctx.push_code(f"// save variable to {res_native_var.name}") + + # Construct the URL + url_native_var, url_decl_str = ctx.declareeBPFVar( + ctx.new_temporary_name(), NativeString() + ) + ctx.push_code(url_decl_str) + ctx.push_code(f'{url_native_var.name} = "/GET/" + {key.name};') + ctx.genBlockingWebdisRequest(url_native_var) + + # Parse the response + # which is put in ResponseMessagePtr external_response_; + ctx.push_code("if (this->external_response_ == nullptr) {") + ctx.push_code(' ENVOY_LOG(error, "[AppNet Filter] Webdis Request Failed");') + ctx.push_code(" std::terminate();") + ctx.push_code("}") + + ret_type = self.ret_type() + assert isinstance(ret_type, AppNetOption) + assert isinstance(ret_type.inner, AppNetString) or isinstance( + ret_type.inner, AppNetInt + ) + + # return {"GET":null} or {"GET":"value"} + # parse the response using json + ctx.push_code( + f"std::string response_str = this->external_response_->bodyAsString();" + ) + ctx.push_code( + f'ENVOY_LOG(info, "[AppNet Filter] Webdis Response: {{}}", response_str);' + ) + + # nlohmann::json j = nlohmann::json::parse(body); + ctx.push_code(f"nlohmann::json j = nlohmann::json::parse(response_str);") + ctx.push_code(f'if (j.contains("GET") && j["GET"].is_null() == false)') + ctx.push_code("{") + if isinstance(ret_type.inner, AppNetInt): + # get it as string, and then cast to int + ctx.push_code(f'std::string str_int = static_cast(j["GET"]);') + ctx.push_code(f"{res_native_var.name}.emplace(std::stoi(str_int));") + else: + ctx.push_code( + f' {res_native_var.name}.emplace(base64_decode(j["GET"], false));' + ) + ctx.push_code("}") + + ctx.push_code("}") + + return res_native_var + + def __init__(self): + super().__init__("get", "map_strong") + + +class GetMap(AppNetBuiltinFuncProto): + def instantiate(self, args: List[AppNetType]) -> bool: + ret = ( + len(args) == 2 + and isinstance(args[0], AppNetMap) + and args[0].key.is_same(args[1]) + ) + ret = ret and args[0].decorator["consistency"] in ["None", "weak"] + if ret: + assert isinstance(args[0], AppNetMap) + assert isinstance(args[1], AppNetType) + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + assert self.prepared + assert isinstance(self.appargs[0], AppNetMap) + return AppNetOption(self.appargs[0].value) + + def gen_code( + self, ctx: eBPFContext, map: NativeVariable, key: NativeVariable + ) -> NativeVariable: + self.native_arg_sanity_check([map, key]) + + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + + ctx.push_code(native_decl_stmt) + + ctx.push_code(f"{res_native_var.name} = map_get_opt({map.name}, {key.name});") + return res_native_var + + def __init__(self): + super().__init__("get", "map") + + +class CurrentTime(AppNetBuiltinFuncProto): + def instantiate(self, args: List[AppNetType]) -> bool: + ret = len(args) == 0 + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + return AppNetInstant() + + def native_arg_sanity_check(self, args: List[NativeVariable]): + assert self.prepared + assert len(args) == 0 + + def gen_code(self, ctx: eBPFContext, args=[]) -> NativeVariable: + self.native_arg_sanity_check(args) + + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + + ctx.push_code(native_decl_stmt) + ctx.push_code(f"{res_native_var.name} = bpf_ktime_get_ns();") + return res_native_var + + def __init__(self): + super().__init__("current_time") + + +class TimeDiff(AppNetBuiltinFuncProto): + def instantiate(self, args: List[AppNetType]) -> bool: + ret = ( + len(args) == 2 + and isinstance(args[0], AppNetInstant) + and isinstance(args[1], AppNetInstant) + ) + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + return AppNetFloat() + + def gen_code( + self, ctx: eBPFContext, end: NativeVariable, start: NativeVariable + ) -> NativeVariable: + self.native_arg_sanity_check([end, start]) + + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + + ctx.push_code(native_decl_stmt) + print("native_decl_stmt =", native_decl_stmt) + # cast into float in second + END_VAR = end.name + START_VAR = start.name + if ctx.find_appnet_var(end.name): + res = ctx.find_appnet_var(f"{end.name}_key") + if res is None: + rhs = ctx.declareAppNetLocalVariable(f"{end.name}_key", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"{end.name}_key", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + res = ctx.find_appnet_var(f"*{end.name}_val") + if res is None: + rhs = ctx.declareAppNetLocalVariable(f"*{end.name}_val", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"*{end.name}_val", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + ctx.push_code(f"{end.name}_val = {end.name}.lookup(&{end.name}_key);") + END_VAR = f"*{end.name}_val" + if ctx.find_appnet_var(start.name): + res = ctx.find_appnet_var(f"{start.name}_key") + if res is None: + rhs = ctx.declareAppNetLocalVariable(f"{start.name}_key", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"{start.name}_key", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + res = ctx.find_appnet_var(f"*{start.name}_val") + if res is None: + rhs = ctx.declareAppNetLocalVariable(f"*{start.name}_val", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"*{start.name}_val", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + ctx.push_code(f"{start.name}_val = {start.name}.lookup(&{start.name}_key);") + START_VAR = f"*{start.name}_val" + if END_VAR != end.name and START_VAR != start.name: + ctx.push_code(f"if ({start.name}_val && {end.name}_val) {{") + ctx.push_code( + f"{res_native_var.name} = (({END_VAR}) - ({START_VAR}));" + ) + ctx.push_code(f"}}") + elif END_VAR != end.name: + ctx.push_code(f"if ({end.name}_val) {{") + ctx.push_code( + f"{res_native_var.name} = (({END_VAR}) - ({START_VAR}));" + ) + ctx.push_code(f"}}") + elif START_VAR != start.name: + ctx.push_code(f"if ({start.name}_val) {{") + ctx.push_code( + f"{res_native_var.name} = (({END_VAR}) - ({START_VAR}));" + ) + ctx.push_code(f"}}") + else: + ctx.push_code( + f"{res_native_var.name} = ({END_VAR} - {START_VAR});" + ) + return res_native_var + + def __init__(self): + super().__init__("time_diff", "in_sec") + + +class Min(AppNetBuiltinFuncProto): + def instantiate(self, args: List[AppNetType]) -> bool: + ret = len(args) == 2 and args[0].is_arithmetic() and args[1].is_arithmetic() + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + assert self.prepared + return ( + AppNetFloat() + if self.appargs[0].is_float() or self.appargs[1].is_float() + else AppNetInt() + ) + + def gen_code( + self, ctx: eBPFContext, a: NativeVariable, b: NativeVariable + ) -> NativeVariable: + self.native_arg_sanity_check([a, b]) + + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + + ctx.push_code(native_decl_stmt) + ANAME = a.name + BNAME = b.name + if ctx.find_appnet_var(a.name): + res = ctx.find_appnet_var(f"{a.name}_key") + if res is None: + rhs = ctx.declareAppNetLocalVariable(f"{a.name}_key", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"{a.name}_key", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + res = ctx.find_appnet_var(f"*{a.name}_val") + if res is None: + rhs = ctx.declareAppNetLocalVariable(f"*{a.name}_val", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"*{a.name}_val", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + ctx.push_code(code=f"{a.name}_val = {a.name}.lookup(&{a.name}_key);") + ANAME = f"*{a.name}_val" + if ctx.find_appnet_var(b.name): + res = ctx.find_appnet_var(f"{b.name}_key") + if res is None: + rhs = ctx.declareAppNetLocalVariable(f"{b.name}_key", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"{b.name}_key", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + res = ctx.find_appnet_var(f"*{b.name}_val") + if res is None: + rhs = ctx.declareAppNetLocalVariable(f"*{b.name}_val", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"*{b.name}_val", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + ctx.push_code(f"{b.name}_val = {b.name}.lookup(&{b.name}_key);") + BNAME = f"*{b.name}_val" + if ANAME != a.name and BNAME != b.name: + ctx.push_code(f"if({a.name}_val && {b.name}_val) {{") + ctx.push_code(f"if((*{a.name}_val) < (*{b.name}_val)) {{") + ctx.push_code(f"{res_native_var.name} = {ANAME};") + ctx.push_code(f"}} else {{") + ctx.push_code(f"{res_native_var.name} = {BNAME};") + ctx.push_code(f"}}") + ctx.push_code(f"}}") + elif ANAME != a.name: + ctx.push_code(f"if({a.name}_val) {{") + ctx.push_code(f"if((*{a.name}_val) < {b.name}) {{") + ctx.push_code(f"{res_native_var.name} = {ANAME};") + ctx.push_code(f"}} else {{") + ctx.push_code(f"{res_native_var.name} = {b.name};") + ctx.push_code(f"}}") + ctx.push_code(f"}}") + elif BNAME != b.name: + ctx.push_code(f"if({b.name}_val) {{") + ctx.push_code(f"if({a.name} < (*{b.name}_val)) {{") + ctx.push_code(f"{res_native_var.name} = {a.name};") + ctx.push_code(f"}} else {{") + ctx.push_code(f"{res_native_var.name} = {BNAME};") + ctx.push_code(f"}}") + ctx.push_code(f"}}") + else: + ctx.push_code(f"if({a.name} < {b.name}) {{") + ctx.push_code(f"{res_native_var.name} = {a.name};") + ctx.push_code(f"}} else {{") + ctx.push_code(f"{res_native_var.name} = {b.name};") + ctx.push_code("}") + return res_native_var + + def __init__(self): + super().__init__("min") + + +class GetRPCMeta(AppNetBuiltinFuncProto): + def __init__(self): + super().__init__("metaget") + self.field_name: str = "" + + def set_field_name(self, field_name: str): + self.field_name = field_name + + def instantiate(self, args: List[AppNetType]) -> bool: + ret = ( + len(args) == 2 + and isinstance(args[0], AppNetRPC) + and isinstance(args[1], AppNetString) + ) + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + assert isinstance(self.msg_type_dict, dict) + assert isinstance(self.appargs[1], AppNetString) + field = self.appargs[1] + assert isinstance(field.literal, str) + return proto_type_to_appnet_type(self.msg_type_dict[field.literal]) + + def gen_code( + self, ctx: eBPFContext, rpc: NativeVariable, field: NativeVariable + ) -> NativeVariable: + self.msg_type_dict = ctx.get_current_msg_fields() + self.native_arg_sanity_check([rpc, field]) + + header_path = ctx.getHeaderPath() + if "meta_status" in self.field_name: + ctx.resp_hdr_code.append( + f""" + auto __status_tmp = this->{header_path}->get(LowerCaseString(":status"))[0]->value().getStringView(); + std::string status_value = std::string(__status_tmp.data(), __status_tmp.size()); + if (status_value == "200") {{ + this->{header_path}->setCopy(LowerCaseString("meta_status"), "success"); + }} else {{ + this->{header_path}->setCopy(LowerCaseString("meta_status"), "failure"); + }} + """ + ) + + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + ctx.push_code( + f"auto __tmp = this->{header_path}->get(LowerCaseString({field.name}))[0]->value().getStringView();" + ) + ctx.push_code(native_decl_stmt) + ctx.push_code( + f"{res_native_var.name} = std::string(__tmp.data(), __tmp.size());" + ) + + return res_native_var + + +class GetRPCField(AppNetBuiltinFuncProto): + def instantiate(self, args: List[AppNetType]) -> bool: + ret = ( + len(args) == 2 + and isinstance(args[0], AppNetRPC) + and isinstance(args[1], AppNetString) + ) + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + assert isinstance(self.msg_type_dict, dict) + assert isinstance(self.appargs[1], AppNetString) + field = self.appargs[1] + assert isinstance(field.literal, str) + return proto_type_to_appnet_type(self.msg_type_dict[field.literal]) + + def gen_code( + self, ctx: eBPFContext, rpc: NativeVariable, field: NativeVariable + ) -> NativeVariable: + self.msg_type_dict = ctx.get_current_msg_fields() + self.native_arg_sanity_check([rpc, field]) + + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + ctx.push_code(native_decl_stmt) + ctx.push_code( + f"{res_native_var.name} = get_rpc_field({rpc.name}, {field.name});" + ) + + return res_native_var + + def __init__(self): + print("lalalala") + super().__init__("get", "rpc_field") + self.msg_type_dict = None + + +class SetMap(AppNetBuiltinFuncProto): + def instantiate(self, args: List[AppNetType]) -> bool: + ret = ( + len(args) == 3 + and isinstance(args[0], AppNetMap) + and args[0].key.is_same(args[1]) + and args[0].value.is_same(args[2]) + ) + ret = ret and args[0].decorator["consistency"] in ["None", "weak"] + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + return AppNetVoid() + + def gen_code( + self, + ctx: eBPFContext, + map: NativeVariable, + key: NativeVariable, + value: NativeVariable, + ) -> None: + self.native_arg_sanity_check([map, key, value]) + + ctx.push_code(f"{map.name}[{key.name}] = {value.name};") + return None + + def __init__(self): + super().__init__("set", "map") + + +class ByteSize(AppNetBuiltinFuncProto): + def instantiate(self, args: List[AppNetType]) -> bool: + # TODO: wasm needs field name for now, discarded + ret = len(args) == 2 and isinstance(args[0], AppNetRPC) + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + return AppNetInt() + + def gen_code( + self, ctx: eBPFContext, rpc: NativeVariable, field_name: NativeVariable + ) -> NativeVariable: + self.native_arg_sanity_check([rpc, field_name]) + + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + ctx.push_code(native_decl_stmt) + ctx.push_code(f"{res_native_var.name} = {rpc.name}.ByteSizeLong();") + + return res_native_var + + def __init__(self): + super().__init__("byte_size") + + +class RandomF(AppNetBuiltinFuncProto): + def instantiate(self, args: List[AppNetType]) -> bool: + ret = len(args) == 2 and args[0].is_arithmetic() and args[1].is_arithmetic() + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + return AppNetFloat() + + def native_arg_sanity_check(self, native_args: List[NativeVariable]): + # allow int and float to be mixed + assert self.prepared + assert len(native_args) == 2 + assert native_args[0].type.is_arithmetic() + assert native_args[1].type.is_arithmetic() + + def gen_code( + self, ctx: eBPFContext, a: NativeVariable, b: NativeVariable + ) -> NativeVariable: + print(f"a.type = {a.type}, a = {a.name}, b.type = {b.type}, b = {b.name}") + self.native_arg_sanity_check([a, b]) + + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + + ctx.push_code(native_decl_stmt) + # get a random float number between a and b + # ctx.push_code( + # f"{res_native_var.name} = {a.name} + static_cast (rand()) / (static_cast (RAND_MAX/({b.name} - {a.name})));" + # ) + ctx.push_code( + f"{res_native_var.name} = {a.name} * 100 + bpf_get_prandom_u32() % ({b.name} * 100 - {a.name} * 100);" + ) + return res_native_var + + def __init__(self): + super().__init__("randomf") + + +# set(record_req, size(record_req), get(rpc, 'body')) +class SetVector(AppNetBuiltinFuncProto): + def instantiate(self, args: List[AppNetType]) -> bool: + ret = ( + len(args) == 3 + and isinstance(args[0], AppNetVec) + and args[0].type.is_same(args[2]) + and isinstance(args[1], AppNetInt) + ) + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + return AppNetVoid() + + def gen_code( + self, + ctx: eBPFContext, + vec: NativeVariable, + index: NativeVariable, + value: NativeVariable, + ) -> None: + self.native_arg_sanity_check([vec, index, value]) + ctx.push_code(f"if ({index.name} >= static_cast({vec.name}.size())) {{") + ctx.push_code(f" {vec.name}.resize({index.name} + 1);") + ctx.push_code("}") + ctx.push_code(f"{vec.name}[{index.name}] = {value.name};") + return None + + def __init__(self): + super().__init__("set", "vector") + + +class SizeVector(AppNetBuiltinFuncProto): + def instantiate(self, args: List[AppNetType]) -> bool: + ret = len(args) == 1 and isinstance(args[0], AppNetVec) + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + return AppNetInt() + + def gen_code(self, ctx: eBPFContext, vec: NativeVariable) -> NativeVariable: + self.native_arg_sanity_check([vec]) + + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + ctx.push_code(native_decl_stmt) + ctx.push_code(f"{res_native_var.name} = {vec.name}.size();") + + return res_native_var + + def __init__(self): + super().__init__("size", "vector") + + +class SetRPCField(AppNetBuiltinFuncProto): + def instantiate(self, args: List[AppNetType]) -> bool: + # TODO: For now, we assume the rpc field is a string + ret = ( + len(args) == 3 + and isinstance(args[0], AppNetRPC) + and isinstance(args[1], AppNetString) + and args[2].is_string() + ) + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + return AppNetVoid() + + def gen_code( + self, + ctx: eBPFContext, + rpc: NativeVariable, + field: NativeVariable, + value: NativeVariable, + ) -> None: + self.native_arg_sanity_check([rpc, field, value]) + + ctx.push_code(f"set_rpc_field({rpc.name}, {field.name}, {value.name});") + buffer_name = ( + "request_buffer_" if ctx.current_procedure == "req" else "response_buffer_" + ) + ctx.push_code(f"replace_payload(this->{buffer_name}, {rpc.name});") + return None + + def __init__(self): + super().__init__("set", "rpc_field") + + +class RPCID(AppNetBuiltinFuncProto): + # func() -> uint + def instantiate(self, args: List[AppNetType]) -> bool: + ret = len(args) == 0 + if ret: + self.prepared = True + return ret + + def ret_type(self) -> AppNetType: + return AppNetUInt() + + def gen_code(self, ctx: eBPFContext) -> NativeVariable: + rpc_id_str, decl = ctx.declareeBPFVar( + ctx.new_temporary_name(), NativeString() + ) + ctx.push_code(decl) + ctx.push_code(f'{rpc_id_str.name} = "appnet-rpc-id";') + + rpcHeaderFunc: GetRPCHeader = GetRPCHeader() + # Mock instantiate + ret = rpcHeaderFunc.instantiate([AppNetRPC(), AppNetString()]) + assert ret == True + + ret = rpcHeaderFunc.gen_code(ctx, None, rpc_id_str, forced_ret_type=AppNetUInt()) # type: ignore + return ret + + def __init__(self): + super().__init__("rpc_id") + + +class SetMapStrongConsistency(AppNetBuiltinFuncProto): + def instantiate(self, args: List[AppNetType]) -> bool: + ret = ( + len(args) == 3 + and isinstance(args[0], AppNetMap) + and args[0].key.is_same(args[1]) + and args[0].value.is_same(args[2]) + ) + ret = ret and args[0].decorator["consistency"] == "strong" + if ret: + assert isinstance(args[0], AppNetMap) + assert isinstance(args[1], AppNetType) + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + return AppNetVoid() + + def gen_code( + self, + ctx: eBPFContext, + map: NativeVariable, + key: NativeVariable, + value: NativeVariable, + ) -> None: + self.native_arg_sanity_check([map, key, value]) + + url_native_var, url_decl_str = ctx.declareeBPFVar( + ctx.new_temporary_name(), NativeString() + ) + ctx.push_code(url_decl_str) + # if map.value is int, we need to convert it to string + assert isinstance(self.appargs[0], AppNetMap) + if isinstance(self.appargs[0].value, AppNetInt): + ctx.push_code( + f'{url_native_var.name} = "/SET/" + {key.name} + "/" + std::to_string({value.name});' + ) + else: + ctx.push_code( + f'{url_native_var.name} = "/SET/" + {key.name} + "/" + base64_encode({value.name}, true);' + ) + + if ctx.current_procedure == "req": + # If in resp or request, we send blocking call SET to webdis + # Construct the URL + ctx.genBlockingWebdisRequest(url_native_var) + elif ctx.current_procedure in ["init", "resp"]: + if ctx.current_procedure == "resp": + LOG.warn( + "strong consistency operation is automatically converted to weak consistency in response procedure." + ) + # we just send non blocking here. + ctx.genNonBlockingWebdisRequest(url_native_var) + + def __init__(self): + super().__init__("set", "map_strong") + + +class Max(AppNetBuiltinFuncProto): + def instantiate(self, args: List[AppNetType]) -> bool: + ret = len(args) == 2 and args[0].is_arithmetic() and args[1].is_arithmetic() + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + assert self.prepared + return ( + AppNetFloat() + if self.appargs[0].is_float() or self.appargs[1].is_float() + else AppNetInt() + ) + + def gen_code( + self, ctx: eBPFContext, a: NativeVariable, b: NativeVariable + ) -> NativeVariable: + self.native_arg_sanity_check([a, b]) + + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + ctx.push_code(native_decl_stmt) + ANAME = a.name + BNAME = b.name + if ctx.find_appnet_var(a.name): + res = ctx.find_appnet_var(f"{a.name}_key") + if res is None: + rhs = ctx.declareAppNetLocalVariable(f"{a.name}_key", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"{a.name}_key", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + res = ctx.find_appnet_var(f"*{a.name}_val") + if res is None: + rhs = ctx.declareAppNetLocalVariable(f"*{a.name}_val", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"*{a.name}_val", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + ctx.push_code(code=f"{a.name}_val = {a.name}.lookup(&{a.name}_key);") + ANAME = f"*{a.name}_val" + if ctx.find_appnet_var(b.name): + res = ctx.find_appnet_var(f"{b.name}_key") + if res is None: + rhs = ctx.declareAppNetLocalVariable(f"{b.name}_key", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"{b.name}_key", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + res = ctx.find_appnet_var(f"*{b.name}_val") + if res is None: + rhs = ctx.declareAppNetLocalVariable(f"*{b.name}_val", AppNetUInt32()) + eBPF_rhs, decl = ctx.declareeBPFVar(f"*{b.name}_val", rhs.type.to_native()) + rhs.native_var = eBPF_rhs + ctx.push_code(decl) + ctx.push_code(f"{b.name}_val = {b.name}.lookup(&{b.name}_key);") + BNAME = f"*{b.name}_val" + if ANAME != a.name and BNAME != b.name: + ctx.push_code(f"if({a.name}_val && {b.name}_val) {{") + ctx.push_code(f"if((*{a.name}_val) > (*{b.name}_val)) {{") + ctx.push_code(f"{res_native_var.name} = {ANAME};") + ctx.push_code(f"}} else {{") + ctx.push_code(f"{res_native_var.name} = {BNAME};") + ctx.push_code(f"}}") + ctx.push_code(f"}}") + elif ANAME != a.name: + ctx.push_code(f"if({a.name}_val) {{") + ctx.push_code(f"if((*{a.name}_val) > {b.name}) {{") + ctx.push_code(f"{res_native_var.name} = {ANAME};") + ctx.push_code(f"}} else {{") + ctx.push_code(f"{res_native_var.name} = {b.name};") + ctx.push_code(f"}}") + ctx.push_code(f"}}") + elif BNAME != b.name: + ctx.push_code(f"if({b.name}_val) {{") + ctx.push_code(f"if({a.name} > (*{b.name}_val)) {{") + ctx.push_code(f"{res_native_var.name} = {a.name};") + ctx.push_code(f"}} else {{") + ctx.push_code(f"{res_native_var.name} = {BNAME};") + ctx.push_code(f"}}") + ctx.push_code(f"}}") + else: + ctx.push_code(f"if({a.name} > {b.name}) {{") + ctx.push_code(f"{res_native_var.name} = {a.name};") + ctx.push_code(f"}} else {{") + ctx.push_code(f"{res_native_var.name} = {b.name};") + ctx.push_code("}") + return res_native_var + + def __init__(self): + super().__init__("max") + + +class GetBackEnds(AppNetBuiltinFuncProto): + # func(int) -> vec + def instantiate(self, args: List[AppNetType]) -> bool: + ret = len(args) == 1 and args[0].is_int() + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + return AppNetVec(AppNetInt()) + + def gen_code( + self, ctx: eBPFContext, backend_name: NativeVariable + ) -> NativeVariable: + self.native_arg_sanity_check([backend_name]) + + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + + ctx.push_code(native_decl_stmt) + # curl "http://10.96.88.99:8080/getReplica?key=23&service=ServiceB" + # {"replica_id":[0,2]} + + # we need to send a request to the shard-manager cluster, and parse the response + ctx.push_code("{") + + url_native, decl = ctx.declareeBPFVar( + ctx.new_temporary_name(), NativeString() + ) + ctx.push_code(decl) + ctx.push_code( + f'{url_native.name} = "/getReplica?key=" + std::to_string({backend_name.name}) + "&service=ServiceB";' + ) + ctx.genBlockingHttpRequest("shard-manager", url_native) + + # parse the response + ctx.push_code("if (this->external_response_ == nullptr) {") + ctx.push_code( + ' ENVOY_LOG(error, "[AppNet Filter] shard manager HTTP Request Failed");' + ) + ctx.push_code(" std::terminate();") + ctx.push_code("}") + + ctx.push_code( + f"std::string response_str = this->external_response_->bodyAsString();" + ) + ctx.push_code( + f'ENVOY_LOG(info, "[AppNet Filter] Shard Manager Response: {{}}", response_str);' + ) + ctx.push_code(f"nlohmann::json j = nlohmann::json::parse(response_str);") + ctx.push_code(f'if (j.contains("replica_id"))') + ctx.push_code("{") + ctx.push_code(f' for (auto& id : j["replica_id"])') + ctx.push_code("{") + ctx.push_code(f" {res_native_var.name}.push_back(id);") + ctx.push_code("}") + ctx.push_code("}") + ctx.push_code("else") + ctx.push_code("{") + ctx.push_code( + f' ENVOY_LOG(error, "[AppNet Filter] Shard Manager Response is in wrong format");' + ) + ctx.push_code(" std::terminate();") + ctx.push_code("}") + + ctx.push_code("}") + return res_native_var + + def __init__(self): + super().__init__("get_backends") + + +class RandomChoices(AppNetBuiltinFuncProto): + # func(vec, int) -> vec + + def instantiate(self, args: List[AppNetType]) -> bool: + ret = len(args) == 2 and isinstance(args[0], AppNetVec) and args[1].is_int() + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + assert self.prepared + return self.appargs[0] + + def gen_code( + self, ctx: eBPFContext, vec: NativeVariable, num: NativeVariable + ) -> NativeVariable: + self.native_arg_sanity_check([vec, num]) + + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + + ctx.push_code(native_decl_stmt) + # declare a random number generator + + ctx.push_code("{") + ctx.push_code(f"std::random_device rd;") + ctx.push_code(f"std::mt19937 gen(rd());") + ctx.push_code(f"std::uniform_int_distribution<> dis(0, {vec.name}.size() - 1);") + ctx.push_code(f"std::vector indices;") + # We need to pick different random numbers out of the vector + ctx.push_code(f"for (int i = 0; i < {num.name}; i++)") + ctx.push_code("{") + ctx.push_code(f" int index = dis(gen);") + ctx.push_code(f" indices.push_back(index);") + ctx.push_code("}") + # random shuffle + ctx.push_code(f"std::shuffle(indices.begin(), indices.end(), gen);") + # copy the selected elements + ctx.push_code(f"for (int i = 0; i < {num.name}; i++)") + ctx.push_code("{") + ctx.push_code(f" {res_native_var.name}.push_back({vec.name}[indices[i]]);") + ctx.push_code("}") + ctx.push_code("}") + + return res_native_var + + def __init__(self): + super().__init__("random_choices") + + +class GetLoad(AppNetBuiltinFuncProto): + # func(int) -> pair + def instantiate(self, args: List[AppNetType]) -> bool: + ret = len(args) == 1 and args[0].is_int() + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + return AppNetPair(AppNetInt(), AppNetInstant()) + + def gen_code( + self, ctx: eBPFContext, backend_id: NativeVariable + ) -> NativeVariable: + self.native_arg_sanity_check([backend_id]) + + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + ctx.push_code(native_decl_stmt) + + # curl "http://load-manager/getLoadInfo?service-name=my-service&replica-ids=0,1,2" + # {"0":{"load":7,"timestamp":1724368802.8939054},"1":{"load":5,"timestamp":1724368802.8939054},"2":{"load":5,"timestamp":1724368802.8939054}} + + # we need to send a request to the load-manager cluster, and parse the response + ctx.push_code("{") + url_native_var, url_decl_str = ctx.declareeBPFVar( + ctx.new_temporary_name(), NativeString() + ) + ctx.push_code(url_decl_str) + ctx.push_code( + f'{url_native_var.name} = "/getLoadInfo?service-name=my-service&replica-ids=" + std::to_string({backend_id.name});' + ) + ctx.genBlockingHttpRequest("load-manager", url_native_var) + + # parse the response + ctx.push_code("if (this->external_response_ == nullptr) {") + ctx.push_code( + ' ENVOY_LOG(error, "[AppNet Filter] load manager HTTP Request Failed");' + ) + ctx.push_code(" std::terminate();") + ctx.push_code("}") + + ctx.push_code( + f"std::string response_str = this->external_response_->bodyAsString();" + ) + ctx.push_code( + f'ENVOY_LOG(info, "[AppNet Filter] Load Manager Response: {{}}", response_str);' + ) + ctx.push_code(f"nlohmann::json j = nlohmann::json::parse(response_str);") + # we need to cast backend_id into string + ctx.push_code( + f"std::string __backend_id_str = std::to_string({backend_id.name});" + ) + ctx.push_code(f"if (j.contains(__backend_id_str))") + ctx.push_code("{") + ctx.push_code(f' {res_native_var.name}.first = j[__backend_id_str]["load"];') + # translate the timestamp into std::chrono::system_clock::time_point + ctx.push_code( + f' {res_native_var.name}.second = std::chrono::system_clock::from_time_t(j[__backend_id_str]["timestamp"]);' + ) + ctx.push_code("}") + ctx.push_code("else") + ctx.push_code("{") + ctx.push_code( + f' ENVOY_LOG(error, "[AppNet Filter] Load Manager Response is in wrong format");' + ) + ctx.push_code(" std::terminate();") + ctx.push_code("}") + ctx.push_code("}") + + return res_native_var + + def __init__(self): + super().__init__("get_load") + + +class EstimateRIFDistribution(AppNetBuiltinFuncProto): + def instantiate(self, args: List[AppNetType]) -> bool: + ret = len(args) == 1 and args[0].is_vec() + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + return AppNetVec(AppNetFloat()) + + def gen_code(self, ctx: eBPFContext, backends: NativeVariable) -> NativeVariable: + self.native_arg_sanity_check([backends]) + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + ctx.push_code(native_decl_stmt) + ctx.push_code("{") + + # generate query url + ctx.push_code(f"nlohmann::json jsonlist = {backends.name};") + url_native_var, url_decl_str = ctx.declareeBPFVar( + ctx.new_temporary_name(), NativeString() + ) + ctx.push_code(url_decl_str) + ctx.push_code( + f'{url_native_var.name} = "/getEstimatedRIFDistribution?backends=" + jsonlist.dump();' + ) + ctx.genBlockingHttpRequest("prequal-manager", url_native_var) + + # get response + ctx.push_code("if (this->external_response_ == nullptr) {") + ctx.push_code( + ' ENVOY_LOG(error, "[AppNet Filter] prequal manager HTTP Request Failed");' + ) + ctx.push_code(" std::terminate();") + ctx.push_code("}") + ctx.push_code( + f"std::string response_str = this->external_response_->bodyAsString();" + ) + ctx.push_code( + f'ENVOY_LOG(info, "[AppNet Filter] Load Manager Response: {{}}", response_str);' + ) + + # parse response as vec + ctx.push_code(f"nlohmann::json j = nlohmann::json::parse(response_str);") + ctx.push_code(f"{res_native_var.name} = j.get>();") + ctx.push_code("}") + + return res_native_var + + def __init__(self): + super().__init__("estimate_RIF_distribution") + + +class Quantile(AppNetBuiltinFuncProto): + def instantiate(self, args: List[AppNetType]) -> bool: + ret = ( + len(args) == 2 and + args[0].is_vec() and + args[1].is_float() + ) + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + return self.appargs[0].type + + def gen_code(self, ctx: eBPFContext, vec: NativeVariable, q: NativeVariable) -> NativeVariable: + self.native_arg_sanity_check([vec, q]) + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + ctx.push_code(native_decl_stmt) + ctx.push_code("{") + # get sorted vector + ctx.push_code(f"std::vector sorted_vec = {vec.name};") + ctx.push_code("std::sort(sorted_vec.begin(), sorted_vec.end());") + # find quantile position + ctx.push_code(f"int idx = static_cast({q.name} * sorted_vec.size()) - 1;") + ctx.push_code(f"{res_native_var.name} = sorted_vec[idx];") + ctx.push_code("}") + + return res_native_var + + def __init__(self): + super().__init__("quantile") + + +class GetRPCHeader(AppNetBuiltinFuncProto): + # TODO: GetRPCHeader needs to infer the return type from the assign stmt. + # TODO: For now we use the most recent assign stmt to infer the return type, which is quite dirty. + + # func(rpc, str) -> ? according to the assign stmt inference result. + def instantiate(self, args: List[AppNetType]) -> bool: + ret = ( + len(args) == 2 + and isinstance(args[0], AppNetRPC) + and isinstance(args[1], AppNetString) + ) + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + assert self.ret_type_inferred is not None + return self.ret_type_inferred + + def native_arg_sanity_check(self, native_args: List[NativeVariable]): + + assert self.prepared + assert len(native_args) == 2 + assert native_args[1].type.is_string() + + def gen_code( + self, + ctx: eBPFContext, + _rpc: NativeVariable, + field: NativeVariable, + *, + forced_ret_type: Optional[AppNetType] = None, + ) -> NativeVariable: + # Note that we specialize the native arg check for this function because + # 1. We don't really use RPC in the function body. + # 2. RPCID will pass a None to here. + self.native_arg_sanity_check([_rpc, field]) + + if forced_ret_type is not None: + self.ret_type_inferred = forced_ret_type + else: + assert ctx.most_recent_assign_left_type is not None + self.ret_type_inferred = ctx.most_recent_assign_left_type + + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + + ctx.push_code(native_decl_stmt) + ctx.push_code("{") + + # auto a = map->get(LowerCaseString("1")); + # auto b = a[0]->value().getStringView(); + # auto str = std::string(b.data(), b.size()); + + header_path = ctx.getHeaderPath() + + ctx.push_code( + f"auto __tmp = this->{header_path}->get(LowerCaseString({field.name}))[0]->value().getStringView();" + ) + ctx.push_code( + f"std::string __tmp_str = std::string(__tmp.data(), __tmp.size());" + ) + + if self.ret_type_inferred.is_int() or self.ret_type_inferred.is_uint(): + # use stoul + ctx.push_code(f"try {{") + ctx.push_code(f" {res_native_var.name} = std::stoul(__tmp_str);") + ctx.push_code(f"}} catch (...) {{") + ctx.push_code( + f' ENVOY_LOG(error, "[AppNet Filter] Failed to convert string to unsigned long");' + ) + ctx.push_code("}") + elif self.ret_type_inferred.is_string(): + ctx.push_code(f"{res_native_var.name} = __tmp_str;") + else: + raise NotImplementedError("only int/uint/string type is supported for now") + + ctx.push_code("}") + + return res_native_var + + def __init__(self): + super().__init__("get_rpc_header") + self.ret_type_inferred: Optional[AppNetType] = None + + +class SetRPCHeader(AppNetBuiltinFuncProto): + # func(rpc, str, ?) -> void + def instantiate(self, args: List[AppNetType]) -> bool: + ret = ( + len(args) == 3 + and isinstance(args[0], AppNetRPC) + and isinstance(args[1], AppNetString) + ) + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + return AppNetVoid() + + def gen_code( + self, + ctx: eBPFContext, + rpc: NativeVariable, + field: NativeVariable, + value: NativeVariable, + ) -> None: + self.native_arg_sanity_check([rpc, field, value]) + + if self.appargs[2].is_int(): + header_path = ctx.getHeaderPath() + ctx.push_code( + f"this->{header_path}->setCopy(LowerCaseString({field.name}), std::to_string({value.name}));" + ) + else: + raise NotImplementedError("only int type is supported for now") + + return None + + def __init__(self): + super().__init__("set_rpc_header") + + +class DeleteMap(AppNetBuiltinFuncProto): + # func(map, key) -> void + def instantiate(self, args: List[AppNetType]) -> bool: + ret = ( + len(args) == 2 + and isinstance(args[0], AppNetMap) + and args[0].key.is_same(args[1]) + ) + ret = ret and args[0].decorator["consistency"] in ["None", "weak"] + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + return AppNetVoid() + + def gen_code( + self, ctx: eBPFContext, map: NativeVariable, key: NativeVariable + ) -> None: + self.native_arg_sanity_check([map, key]) + + ctx.push_code(f"{map.name}.erase({key.name});") + return None + + def __init__(self): + super().__init__("delete", "map") + + +class SetMetadata(AppNetBuiltinFuncProto): + # Example: + # auto a = ProtobufWkt::Struct(); + # // add rpc-id to dynamic metadata + # a.mutable_fields()->insert({"rpc-id", ValueUtil::stringValue("123")}); + # this->decoder_callbacks_->streamInfo().setDynamicMetadata("rpc-id", a); + + # func(str, T) -> void + def instantiate(self, args: List[AppNetType]) -> bool: + ret = ( + len(args) == 2 and isinstance(args[0], AppNetString) and args[1].is_basic() + ) + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + return AppNetVoid() + + def gen_code( + self, ctx: eBPFContext, key: NativeVariable, value: NativeVariable + ) -> None: + self.native_arg_sanity_check([key, value]) + + if ( + isinstance(self.appargs[1], AppNetInt) == False + and isinstance(self.appargs[1], AppNetUInt) == False + ): + raise NotImplementedError("only int/uint type is supported for now") + + ctx.push_code("{") + # cast it into string first + ctx.push_code(f"std::string __tmp_str = std::to_string({value.name});") + ctx.push_code(f"ProtobufWkt::Struct __tmp;") + ctx.push_code( + f"__tmp.mutable_fields()->insert({{{{{key.name}, ValueUtil::stringValue(__tmp_str)}}}});" + ) + ctx.push_code( + f'{ctx.get_callback_name()}->streamInfo().setDynamicMetadata("appnet", __tmp);' + ) + ctx.push_code("}") + return None + + def __init__(self): + super().__init__("set_metadata") + + +class GetMetadata(AppNetBuiltinFuncProto): + # Example: + # auto a = ProtobufWkt::Struct(); + # // fetch rpc-id from dynamic metadata + # const envoy::config::core::v3::Metadata& metadata = this->encoder_callbacks_->streamInfo().dynamicMetadata(); + # const auto& rpc_id = metadata.filter_metadata().at("rpc-id").fields().at("rpc-id").string_value(); + # ENVOY_LOG(info, "[Ratelimit Filter] rpc-id={}", rpc_id); + + # func(str) -> ? + # use ctx.most_recent_assign_left_type to infer the return type + def instantiate(self, args: List[AppNetType]) -> bool: + ret = len(args) == 1 and isinstance(args[0], AppNetString) + if ret: + self.prepared = True + self.appargs = args + self.ret_type_inferred = None + return ret + + def ret_type(self) -> AppNetType: + assert self.ret_type_inferred is not None + return self.ret_type_inferred + + def gen_code(self, ctx: eBPFContext, key: NativeVariable) -> NativeVariable: + self.native_arg_sanity_check([key]) + + assert ctx.most_recent_assign_left_type is not None + self.ret_type_inferred = ctx.most_recent_assign_left_type + if ( + self.ret_type_inferred.is_int() == False + and self.ret_type_inferred.is_uint() == False + ): + raise NotImplementedError("only int type is supported for now") + + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + ctx.push_code(native_decl_stmt) + + ctx.push_code("{") + ctx.push_code( + f"const envoy::config::core::v3::Metadata& metadata = {ctx.get_callback_name()}->streamInfo().dynamicMetadata();" + ) + ctx.push_code( + f'const auto& __tmp = metadata.filter_metadata().at("appnet").fields().at({key.name});' + ) + ctx.push_code(f"std::string __tmp_str = __tmp.string_value();") + ctx.push_code(f"try {{") + # to int + ctx.push_code(f" {res_native_var.name} = std::stoi(__tmp_str);") + ctx.push_code(f"}} catch (...) {{") + ctx.push_code( + f' ENVOY_LOG(error, "[AppNet Filter] Failed to convert string to int (or uint)");' + ) + ctx.push_code("}") + ctx.push_code("}") + + return res_native_var + + def __init__(self): + super().__init__("get_metadata") + + +class Encrypt(AppNetBuiltinFuncProto): + # func(msg: str, password: str) -> new_msg: str + def instantiate(self, args: List[AppNetType]) -> bool: + ret = len(args) == 2 and args[0].is_string() and args[1].is_string() + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + return AppNetString() + + def gen_code( + self, ctx: eBPFContext, msg: NativeVariable, password: NativeVariable + ) -> NativeVariable: + self.native_arg_sanity_check([msg, password]) + + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + ctx.push_code(native_decl_stmt) + + ctx.push_code("{") + ctx.push_code(f"std::string __tmp_str;") + ctx.push_code(f"std::string __password_str = {password.name};") + ctx.push_code(f"std::string __msg_str = {msg.name};") + ctx.push_code(f"for (size_t i = 0; i < __msg_str.size(); i++)") + ctx.push_code("{") + ctx.push_code( + f" __tmp_str.push_back(__msg_str[i] ^ __password_str[i % __password_str.size()]);" + ) + ctx.push_code("}") + ctx.push_code(f"{res_native_var.name} = __tmp_str;") + ctx.push_code("}") + + return res_native_var + + def __init__(self): + super().__init__("encrypt") + + +class Decrypt(AppNetBuiltinFuncProto): + # func(msg: str, password: str) -> new_msg: str + def instantiate(self, args: List[AppNetType]) -> bool: + ret = len(args) == 2 and args[0].is_string() and args[1].is_string() + if ret: + self.prepared = True + self.appargs = args + return ret + + def ret_type(self) -> AppNetType: + return AppNetString() + + def gen_code( + self, ctx: eBPFContext, msg: NativeVariable, password: NativeVariable + ) -> NativeVariable: + self.native_arg_sanity_check([msg, password]) + + ( + res_native_var, + native_decl_stmt, + ) = ctx.declareeBPFVar(ctx.new_temporary_name(), self.ret_type().to_native()) + ctx.push_code(native_decl_stmt) + + ctx.push_code("{") + ctx.push_code(f"std::string __tmp_str;") + ctx.push_code(f"std::string __password_str = {password.name};") + ctx.push_code(f"std::string __msg_str = {msg.name};") + ctx.push_code(f"for (size_t i = 0; i < __msg_str.size(); i++)") + ctx.push_code("{") + ctx.push_code( + f" __tmp_str.push_back(__msg_str[i] ^ __password_str[i % __password_str.size()]);" + ) + ctx.push_code("}") + ctx.push_code(f"{res_native_var.name} = __tmp_str;") + ctx.push_code("}") + + return res_native_var + + def __init__(self): + super().__init__("decrypt") + + +APPNET_BUILTIN_FUNCS = [ + GetRPCMeta, + GetRPCField, + GetMap, + CurrentTime, + TimeDiff, + Min, + Max, + SetMap, + ByteSize, + RandomF, + SetVector, + SizeVector, + SetRPCField, + RPCID, + GetMapStrongConsistency, + SetMapStrongConsistency, + GetBackEnds, + RandomChoices, + GetLoad, + GetRPCHeader, + SetRPCHeader, + DeleteMap, + GetMetadata, + SetMetadata, + Encrypt, + Decrypt, + EstimateRIFDistribution, + Quantile, +] diff --git a/compiler/element/backend/eBPF/nativetype.py b/compiler/element/backend/eBPF/nativetype.py new file mode 100644 index 00000000..54d465f1 --- /dev/null +++ b/compiler/element/backend/eBPF/nativetype.py @@ -0,0 +1,248 @@ +from __future__ import annotations + +import copy +from enum import Enum +from typing import List, Optional + +from compiler.element.backend.eBPF import * +from compiler.element.logger import ELEMENT_LOG as LOG + + +class NativeType: + def gen_decl(self, name: str) -> str: + raise NotImplementedError(f"gen_decl not implemented for {self}") + + def gen_decl_local(self, name: str) -> str: + raise NotImplementedError(f"gen_decl_local not implemented for {self}") + + def is_arithmetic(self) -> bool: + return ( + isinstance(self, Int) or isinstance(self, Float) or isinstance(self, UInt) + ) + + def is_string(self) -> bool: + return isinstance(self, String) + + def is_bool(self) -> bool: + return isinstance(self, Bool) + + def is_basic(self) -> bool: + return self.is_arithmetic() or self.is_bool() + + def is_rpc(self) -> bool: + return isinstance(self, RPC) + + def is_timepoint(self) -> bool: + return isinstance(self, Timepoint) + + def type_name(self) -> str: + raise Exception(f"type_name not implemented for {self}") + + def is_same(self, other: NativeType) -> bool: + # not apply for complex types + assert ( + self.is_basic() or self.is_string() or self.is_timepoint() or self.is_rpc() + ) + return type(self) == type(other) + + +class RPC(NativeType): + def type_name(self) -> str: + return "::appnetsamplefilter::Msg" + + def gen_decl(self, name: str) -> str: + return f"::appnetsamplefilter::Msg {name};" + + def gen_decl_local(self, name: str) -> str: + return f"::appnetsamplefilter::Msg {name};" + + +class Timepoint(NativeType): + def type_name(self) -> str: + # return "std::chrono::time_point" + return "u64" + + def gen_decl(self, name: str) -> str: + # return f"std::chrono::time_point {name};" + return f"BPF_ARRAY({name}, u64, 1);" + + def gen_decl_local(self, name: str) -> str: + # return f"std::chrono::time_point {name};" + return f"u64 {name} = 0;" + +class Int(NativeType): + def type_name(self) -> str: + return "s32" + + def gen_decl(self, name: str) -> str: + return f"BPF_ARRAY({name}, s64, 1);" + + def gen_decl_local(self, name: str) -> str: + return f"s32 {name} = 0;" + +class UInt32(NativeType): + def type_name(self) -> str: + return "u32" + + def gen_decl(self, name: str) -> str: + return f"BPF_ARRAY({name}, u32, 1);" + + def gen_decl_local(self, name: str) -> str: + return f"u32 {name} = 0;" + + +class UInt(NativeType): + def type_name(self) -> str: + return "u64" + + def gen_decl(self, name: str) -> str: + return f"BPF_ARRAY({name}, u64, 1);" + + def gen_decl_local(self, name: str) -> str: + return f"u64 {name} = 0;" + + +class Float(NativeType): + def type_name(self) -> str: + return "float" + + def gen_decl(self, name: str) -> str: + return f"float {name} = 0;" + + def gen_decl_local(self, name: str) -> str: + return f"float {name} = 0;" + + +class String(NativeType): + def type_name(self) -> str: + return "std::string" + + def gen_decl(self, name: str) -> str: + return f'std::string {name} = "";' + + def gen_decl_local(self, name: str) -> str: + return f'std::string {name} = "";' + + +class Bool(NativeType): + def type_name(self) -> str: + return "bool" + + def gen_decl(self, name: str) -> str: + return f"bool {name} = false;" + + def gen_decl_local(self, name: str) -> str: + return f"bool {name} = false;" + + +class Bytes(NativeType): + def type_name(self) -> str: + return "std::vector" + + def gen_decl(self, name: str) -> str: + return f"std::vector {name}{{0}};" + + def gen_decl_local(self, name: str) -> str: + return f"std::vector {name}{{0}};" + +class Option(NativeType): + inner: NativeType + + def __init__(self, inner: NativeType): + self.inner = inner + + def gen_decl(self, name: str) -> str: + return f"std::optional<{self.inner.type_name()}> {name} = std::nullopt;" + + def gen_decl_local(self, name: str) -> str: + return f"std::optional<{self.inner.type_name()}> {name} = std::nullopt;" + + def is_same(self, other: NativeType) -> bool: + if not isinstance(other, Option): + return False + return self.inner.is_same(other.inner) + + def type_name(self) -> str: + return f"std::optional<{self.inner.type_name()}>" + + +class Map(NativeType): + key: NativeType + value: NativeType + + def __init__(self, key: NativeType, value: NativeType): + self.key = key + self.value = value + + def gen_decl(self, name: str) -> str: + return ( + f"BPF_HASH({name}, {self.key.type_name()}, {self.value.type_name()});" + ) + + def gen_decl_local(self, name: str) -> str: + return ( + f"BPF_HASH({name}, {self.key.type_name()}, {self.value.type_name()});" + ) + + def is_same(self, other: NativeType) -> bool: + if not isinstance(other, Map): + return False + return self.key.is_same(other.key) and self.value.is_same(other.value) + + def type_name(self) -> str: + return f"std::map<{self.key.type_name()}, {self.value.type_name()}>" + + +class Vec(NativeType): + type: NativeType + + def __init__(self, type: NativeType): + self.type = type + + def gen_decl(self, name: str) -> str: + return f"std::vector<{self.type.type_name()}> {name} = {{}};" + + def gen_decl_local(self, name: str) -> str: + return f"std::vector<{self.type.type_name()}> {name} = {{}};" + + def is_same(self, other: NativeType) -> bool: + if not isinstance(other, Vec): + return False + return self.type.is_same(other.type) + + def type_name(self) -> str: + return f"std::vector<{self.type.type_name()}>" + + +class Pair(NativeType): + first: NativeType + second: NativeType + + def __init__(self, first: NativeType, second: NativeType): + self.first = first + self.second = second + + def gen_decl(self, name: str) -> str: + return f"std::pair<{self.first.type_name()}, {self.second.type_name()}> {name};" + + def gen_decl_local(self, name: str) -> str: + return f"std::pair<{self.first.type_name()}, {self.second.type_name()}> {name};" + + def is_same(self, other: NativeType) -> bool: + if not isinstance(other, Pair): + return False + return self.first.is_same(other.first) and self.second.is_same(other.second) + + def type_name(self) -> str: + return f"std::pair<{self.first.type_name()}, {self.second.type_name()}>" + + +class NativeVariable: + name: str + type: NativeType + local: bool # is this variable local to a request or not + + def __init__(self, name: str, type: NativeType, local: bool): + self.name = name + self.type = type + self.local = local diff --git a/compiler/element/backend/eBPF/types.py b/compiler/element/backend/eBPF/types.py new file mode 100644 index 00000000..595274ef --- /dev/null +++ b/compiler/element/backend/eBPF/types.py @@ -0,0 +1,36 @@ +from compiler.element.backend.eBPF import * +from compiler.element.backend.eBPF.appnettype import RPC as AppNetRPC +from compiler.element.backend.eBPF.appnettype import AppNetType, AppNetVariable +from compiler.element.backend.eBPF.appnettype import Bool as AppNetBool +from compiler.element.backend.eBPF.appnettype import Bytes as AppNetBytes +from compiler.element.backend.eBPF.appnettype import Float as AppNetFloat +from compiler.element.backend.eBPF.appnettype import Instant as AppNetInstant +from compiler.element.backend.eBPF.appnettype import Int as AppNetInt +from compiler.element.backend.eBPF.appnettype import Map as AppNetMap +from compiler.element.backend.eBPF.appnettype import Option as AppNetOption +from compiler.element.backend.eBPF.appnettype import Pair as AppNetPair +from compiler.element.backend.eBPF.appnettype import String as AppNetString +from compiler.element.backend.eBPF.appnettype import UInt as AppNetUInt +from compiler.element.backend.eBPF.appnettype import UInt32 as AppNetUInt32 +from compiler.element.backend.eBPF.appnettype import Vec as AppNetVec +from compiler.element.backend.eBPF.appnettype import Void as AppNetVoid +from compiler.element.backend.eBPF.appnettype import ( + appnet_type_from_str, + proto_type_to_appnet_type, +) +from compiler.element.backend.eBPF.nativetype import RPC as NativeRPC +from compiler.element.backend.eBPF.nativetype import Bool as NativeBool +from compiler.element.backend.eBPF.nativetype import Bytes as NativeBytes +from compiler.element.backend.eBPF.nativetype import Float as NativeFloat +from compiler.element.backend.eBPF.nativetype import Int as NativeInt +from compiler.element.backend.eBPF.nativetype import Map as NativeMap +from compiler.element.backend.eBPF.nativetype import NativeType, NativeVariable +from compiler.element.backend.eBPF.nativetype import Option as NativeOption +from compiler.element.backend.eBPF.nativetype import Pair as NativePair +from compiler.element.backend.eBPF.nativetype import String as NativeString +from compiler.element.backend.eBPF.nativetype import ( + Timepoint as NativeTimepoint, +) +from compiler.element.backend.eBPF.nativetype import UInt as NativeUInt +from compiler.element.backend.eBPF.nativetype import UInt32 as NativeUInt32 +from compiler.element.backend.eBPF.nativetype import Vec as NativeVec diff --git a/compiler/element/node.py b/compiler/element/node.py index 0a462434..875553b0 100644 --- a/compiler/element/node.py +++ b/compiler/element/node.py @@ -161,6 +161,12 @@ def __init__(self, name: str, consistency: str, combiner: str, persistence: bool self.persistence = persistence +import re +def is_integer(s): + return re.fullmatch(r"-?\d+", s) is not None +def is_float(s): + return re.fullmatch(r"-?\d+\.\d+", s) is not None + class Literal(Node): def __init__(self, value: str): self.value = value @@ -168,6 +174,10 @@ def __init__(self, value: str): # currently only String and Bool are supported. if value.startswith("'") and value.endswith("'"): self.type = DataType.STR + elif is_integer(value): + self.type = DataType.INT + elif is_float(value): + self.type = DataType.FLOAT elif value in ["true", "false"]: self.type = DataType.BOOL else: diff --git a/compiler/graph/ir/__init__.py b/compiler/graph/ir/__init__.py index 288b6ac4..1d8a4411 100644 --- a/compiler/graph/ir/__init__.py +++ b/compiler/graph/ir/__init__.py @@ -51,6 +51,8 @@ def __init__(self, client: str, server: str, chain: List[Dict], pair: List[Dict] "ambient": [], "server_sidecar": [], "server_grpc": [], + "client_eBPF" : [], + "server_eBPF" : [], } # determine an initial assignment # principle: @@ -85,21 +87,29 @@ def __init__(self, client: str, server: str, chain: List[Dict], pair: List[Dict] client_chain, server_chain = chain[: mid + 1], chain[mid + 1 :] current_mode = "sidecar" for element in client_chain[::-1]: - if ( - "processor" in element + if ("processor" in element + and "eBPF" in element["processor"] + and "sidecar" not in element["processor"]): + current_mode = "eBPF" + elif ("processor" in element and "grpc" in element["processor"] - and "sidecar" not in element["processor"] - ): - current_mode = "grpc" + and "sidecar" not in element["processor"]): + current_mode = "sidecar" if "processor" in element and current_mode not in element["processor"]: raise ValueError("invalid grpc/sidecar requirements") + if current_mode == "eBPF": + TARGET = "eBPF" + elif "grpc" in current_mode: + TARGET = "grpc" + else: + TARGET = "sidecar_wasm" self.elements["client_" + current_mode].insert( 0, AbsElement( element, server=server, initial_position="client", - initial_target="grpc" if "grpc" in current_mode else "sidecar_wasm", + initial_target=TARGET, ), ) current_mode = "sidecar" @@ -218,6 +228,8 @@ def complete_chain(self) -> List[AbsElement]: + self.elements["ambient"] + self.elements["server_sidecar"] + self.elements["server_grpc"] + + self.elements["client_eBPF"] + + self.elements["server_eBPF"] ) def optimize(self, opt_level: str, algorithm: str, dump_property: bool): diff --git a/compiler/graph/ir/element.py b/compiler/graph/ir/element.py index 3e9a55f0..bef1f876 100644 --- a/compiler/graph/ir/element.py +++ b/compiler/graph/ir/element.py @@ -33,7 +33,7 @@ def __init__( server(str): used for generating unique element name consider "cache" on both A->B and A->C initial_position(str): "client" or "server" or "ambient" - initial_target(str): "grpc" or ["sidecar", "ambient"] * ["wasm", "native"] + initial_target(str): "grpc" or ["sidecar", "ambient"] * ["wasm", "native"] or "eBPF" """ if info == "NETWORK" or info == "IPC": self.name = info diff --git a/compiler/graph/ir/optimization.py b/compiler/graph/ir/optimization.py index 74139130..0476fa40 100644 --- a/compiler/graph/ir/optimization.py +++ b/compiler/graph/ir/optimization.py @@ -326,6 +326,7 @@ def cost(chain: List[AbsElement]) -> float: "sidecar_wasm": 3.0, "ambient_native": 1.0, "ambient_wasm": 3.0, + "eBPF": 1.0 } basic_overhead_config = { "client_grpc": 0.5, diff --git a/compiler/main.py b/compiler/main.py index ccc07d2e..0e1992e3 100644 --- a/compiler/main.py +++ b/compiler/main.py @@ -46,7 +46,7 @@ def parse_args(): strong: strict equivalence.""", type=str, choices=["no", "ignore", "weak", "strong"], - default="weak", + default="no", # no: no optimization # ignore: aggresive, ignore equivalence requirements # weak: allow differences in drop rate, records, etc. @@ -127,6 +127,7 @@ def compile_impl( server + "".join(element_names)[:24] ) # Envoy does not allow long struct names os.system(f"mkdir -p {gen_dir}") + print("In def compile_impl() function, before entering main.py's gen_code()") gen_code( element_names, element_paths, @@ -147,13 +148,22 @@ def compile_impl( def generate_element_impl(graphirs: Dict[str, GraphIR], pseudo_impl: bool): compiled_name = set() + print("Enter generate_element_impl, graph_base_dir =", graph_base_dir) + # print(graphirs['frontend->server'].__str__()) gen_dir = os.path.join(graph_base_dir, "generated") + print("gen_dir =", gen_dir) os.system(f"rm {gen_dir} -rf") + for k, v in graphirs.items(): + print("key =", k, "val =", v.__str__()) + # print("val.client =", v.client, "val.server =", v.server, "val.element =", v.elements) for gir in graphirs.values(): # For each edge in the application for element in gir.complete_chain(): # For each element in the edge + print("gir =", gir, "element =", element) + print("type(gir) =", type(gir), "type(element) =", type(element)) identifier = element.lib_name + element.final_position gen_name = element.server + "".join(element.name)[:24] + print("identifier =", identifier, "gen_name =", gen_name, "element.target =", element.target) if element.target in ["mrpc", "grpc"]: element.compile_dir = os.path.join( gen_dir, f"{gen_name}_{element.final_position}_{element.target}" @@ -164,6 +174,7 @@ def generate_element_impl(graphirs: Dict[str, GraphIR], pseudo_impl: bool): ) if identifier not in compiled_name: if pseudo_impl: + # print("Before pseudo_compile") pseudo_compile( element.lib_name, gen_dir, @@ -171,6 +182,7 @@ def generate_element_impl(graphirs: Dict[str, GraphIR], pseudo_impl: bool): element.final_position, ) else: + print("Before compile_impl") compile_impl( element.name, element.path, @@ -224,14 +236,13 @@ def main(args): # Step 1: Parse the spec file and generate graph IRs (see examples/chain for details about spec format) GRAPH_LOG.info(f"Parsing graph spec file {args.spec_path}...") parser = GraphParser() + print("Within Step 1 Pass") graphirs, app_name, app_manifest_file, app_edges = parser.parse(args.spec_path) - if args.verbose: for gir in graphirs.values(): if gir.name not in gir_summary: gir_summary[gir.name] = {"pre-optimized": [], "post-optimized": []} gir_summary[gir.name]["pre-optimized"] = gir.to_rich() - # Step 2: Extract element properties via element compiler and optimize the graph IR. GRAPH_LOG.info("Generating element properties and optimizing the graph IR...") for gir in graphirs.values(): @@ -239,11 +250,13 @@ def main(args): # pseudo_property is set to True when we want to use user-provided properties instead of auto-generated ones for element in gir.complete_chain(): element.set_property_source(args.pseudo_property) + print(f"Before optimize gir = {gir}") gir.optimize(args.opt_level, args.opt_algorithm, args.dump_property) + print(f"After optimize gir = {gir}") if args.opt_level != "no": handle_state(graphirs) - + print("Before Step 3 Pass") # Step 3: Generate backend code for the elements and deployment scripts. if not args.dry_run: GRAPH_LOG.info( @@ -253,6 +266,7 @@ def main(args): # pseudo_impl is set to True when we want to use user-provided implementations instead of auto-generated ones generate_element_impl(graphirs, args.pseudo_impl) # Step 3.2: Generate deployment scripts + print("Before scriptgen, can ignore because it shows more running scripts") scriptgen(graphirs, app_name, app_manifest_file, app_edges) # Dump graphir summary (in yaml) @@ -262,7 +276,8 @@ def main(args): graphir_summary = "" for gir in graphirs.values(): graphir_summary += str(gir) - print(graphir_summary) + print("graphir_summary", graphir_summary) + print("post graphir_summary") graphir_summary_dict = {} for edge_name, gir in graphirs.items(): graphir_summary_dict[edge_name] = gir.export_summary() diff --git a/examples/chain/echo_cp.yaml b/examples/chain/echo_cp.yaml new file mode 100644 index 00000000..38a552a1 --- /dev/null +++ b/examples/chain/echo_cp.yaml @@ -0,0 +1,40 @@ +app_name: echo +app_manifest: /users/xiang95/appnet022725/compiler/examples/applications/echo/echo.yaml +app_structure: +- frontend->server +edge: + frontend->server: + - method: echo + name: fault + path: /users/xiang95/appnet022725/compiler/examples/elements/echo_elements/fault.appnet + position: client + proto: /users/xiang95/appnet022725/go-lib/sample/echo-pb/echo.proto + proto_mod_name: github.com/appnet-org/golib/sample/echo-pb + proto_mod_location: /users/xiang95/appnet022725/go-lib/sample/echo-pb + upgrade: true + envoy_native: true + processor: + - eBPF + # - method: echo + # name: logging + # path: /users/xiang95/appnet022725/compiler/examples/elements/echo_elements/logging.appnet + # position: client + # proto: /users/xiang95/appnet022725/go-lib/sample/echo-pb/echo.proto + # proto_mod_name: github.com/appnet-org/golib/sample/echo-pb + # proto_mod_location: /users/xiang95/appnet022725/go-lib/sample/echo-pb + # upgrade: true + # envoy_native: false + # processor: + # - sidecar + # - method: echo + # name: firwall + # path: /users/xiang95/appnet022725/compiler/examples/elements/echo_elements/firewall.appnet + # position: client + # proto: /users/xiang95/appnet022725/go-lib/sample/echo-pb/echo.proto + # proto_mod_name: github.com/appnet-org/golib/sample/echo-pb + # proto_mod_location: /users/xiang95/appnet022725/go-lib/sample/echo-pb + # upgrade: true + # envoy_native: false + # processor: + # - sidecar +link: {} \ No newline at end of file diff --git a/requirments.txt b/requirements.txt similarity index 100% rename from requirments.txt rename to requirements.txt