Skip to content

Commit

Permalink
feat: add fallback sink support (#157)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored Apr 18, 2024
1 parent 252ca16 commit 0c2501b
Show file tree
Hide file tree
Showing 17 changed files with 196 additions and 57 deletions.
4 changes: 4 additions & 0 deletions examples/sink/async_log/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ async def handler(self, datums: AsyncIterable[Datum]) -> Responses:
async for msg in datums:
_LOGGER.info("User Defined Sink %s", msg.value.decode("utf-8"))
responses.append(Response.as_success(msg.id))
# if we are not able to write to sink and if we have a fallback sink configured
# we can use Response.as_fallback(msg.id)) to write the message to fallback sink
return responses


Expand All @@ -22,6 +24,8 @@ async def udsink_handler(datums: AsyncIterable[Datum]) -> Responses:
async for msg in datums:
_LOGGER.info("User Defined Sink %s", msg.value.decode("utf-8"))
responses.append(Response.as_success(msg.id))
# if we are not able to write to sink and if we have a fallback sink configured
# we can use Response.as_fallback(msg.id)) to write the message to fallback sink
return responses


Expand Down
2 changes: 1 addition & 1 deletion examples/sink/log/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ image:

.PHONY: clean
clean:
rm -rf ./dist
rm -rf ./dist
4 changes: 4 additions & 0 deletions examples/sink/log/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ def handler(self, datums: Iterator[Datum]) -> Responses:
for msg in datums:
_LOGGER.info("User Defined Sink %s", msg.value.decode("utf-8"))
responses.append(Response.as_success(msg.id))
# if we are not able to write to sink and if we have a fallback sink configured
# we can use Response.as_fallback(msg.id)) to write the message to fallback sink
return responses


Expand All @@ -24,6 +26,8 @@ def udsink_handler(datums: Iterator[Datum]) -> Responses:
"User Defined Sink: Payload %s , Headers %s", msg.value.decode("utf-8"), msg.headers
)
responses.append(Response.as_success(msg.id))
# if we are not able to write to sink and if we have a fallback sink configured
# we can use Response.as_fallback(msg.id)) to write the message to fallback sink
return responses


Expand Down
5 changes: 5 additions & 0 deletions pynumaflow/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
SOURCE_SOCK_PATH = "/var/run/numaflow/source.sock"
MULTIPROC_MAP_SOCK_PORT = 55551
MULTIPROC_MAP_SOCK_ADDR = "0.0.0.0"
FALLBACK_SINK_SOCK_PATH = "/var/run/numaflow/fb-sink.sock"

# Server information file configs
MAP_SERVER_INFO_FILE_PATH = "/var/run/numaflow/mapper-server-info"
Expand All @@ -27,6 +28,10 @@
SINK_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sinker-server-info"
SIDE_INPUT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sideinput-server-info"
SOURCE_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sourcer-server-info"
FALLBACK_SINK_SERVER_INFO_FILE_PATH = "/var/run/numaflow/fb-sinker-server-info"

ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE"
UD_CONTAINER_FALLBACK_SINK = "fb-udsink"

# TODO: need to make sure the DATUM_KEY value is the same as
# https://github.com/numaproj/numaflow-go/blob/main/pkg/function/configs.go#L6
Expand Down
14 changes: 11 additions & 3 deletions pynumaflow/proto/sinker/sink.proto
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";

Expand Down Expand Up @@ -33,15 +32,24 @@ message ReadyResponse {
bool ready = 1;
}

/*
* Status is the status of the response.
*/
enum Status {
SUCCESS = 0;
FAILURE = 1;
FALLBACK = 2;
}

/**
* SinkResponse is the individual response of each message written to the sink.
*/
message SinkResponse {
message Result {
// id is the ID of the message, can be used to uniquely identify the message.
string id = 1;
// success denotes the status of persisting to disk. if set to false, it means writing to sink for the message failed.
bool success = 2;
// status denotes the status of persisting to sink. It can be SUCCESS, FAILURE, or FALLBACK.
Status status = 2;
// err_msg is the error message, set it if success is set to false.
string err_msg = 3;
}
Expand Down
16 changes: 9 additions & 7 deletions pynumaflow/proto/sinker/sink_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 15 additions & 3 deletions pynumaflow/sinker/_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,33 @@ class Response:
id: the id of the event.
success: boolean indicating whether the event was successfully processed.
err: error message if the event was not successfully processed.
fallback: fallback is true if the message to be sent to the fallback sink.
"""

id: str
success: bool
err: Optional[str]
fallback: bool

__slots__ = ("id", "success", "err")
__slots__ = ("id", "success", "err", "fallback")

# as_success creates a successful Response with the given id.
# The Success field is set to true.
@classmethod
def as_success(cls: type[R], id_: str) -> R:
return Response(id=id_, success=True, err=None)
return Response(id=id_, success=True, err=None, fallback=False)

# as_failure creates a failed Response with the given id and error message.
# The success field is set to false and the err field is set to the provided error message.
@classmethod
def as_failure(cls: type[R], id_: str, err_msg: str) -> R:
return Response(id=id_, success=False, err=err_msg)
return Response(id=id_, success=False, err=err_msg, fallback=False)

# as_fallback creates a Response with the fallback field set to true.
# This indicates that the message should be sent to the fallback sink.
@classmethod
def as_fallback(cls: type[R], id_: str) -> R:
return Response(id=id_, fallback=True, err=None, success=False)


class Responses(Sequence[R]):
Expand Down
12 changes: 12 additions & 0 deletions pynumaflow/sinker/async_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
MAX_MESSAGE_SIZE,
MAX_THREADS,
SINK_SERVER_INFO_FILE_PATH,
ENV_UD_CONTAINER_TYPE,
UD_CONTAINER_FALLBACK_SINK,
_LOGGER,
FALLBACK_SINK_SOCK_PATH,
FALLBACK_SINK_SERVER_INFO_FILE_PATH,
)

from pynumaflow.shared.server import NumaflowServer, start_async_server
Expand Down Expand Up @@ -74,6 +79,12 @@ def __init__(
max_threads=MAX_THREADS,
server_info_file=SINK_SERVER_INFO_FILE_PATH,
):
# If the container type is fallback sink, then use the fallback sink address and path.
if os.getenv(ENV_UD_CONTAINER_TYPE, "") == UD_CONTAINER_FALLBACK_SINK:
_LOGGER.info("Using Fallback Sink")
sock_path = FALLBACK_SINK_SOCK_PATH
server_info_file = FALLBACK_SINK_SERVER_INFO_FILE_PATH

self.sock_path = f"unix://{sock_path}"
self.max_threads = min(max_threads, int(os.getenv("MAX_THREADS", "4")))
self.max_message_size = max_message_size
Expand All @@ -85,6 +96,7 @@ def __init__(
("grpc.max_send_message_length", self.max_message_size),
("grpc.max_receive_message_length", self.max_message_size),
]

self.servicer = AsyncSinkServicer(sinker_instance)

def start(self):
Expand Down
11 changes: 10 additions & 1 deletion pynumaflow/sinker/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
_LOGGER,
UDFType,
SINK_SERVER_INFO_FILE_PATH,
ENV_UD_CONTAINER_TYPE,
UD_CONTAINER_FALLBACK_SINK,
FALLBACK_SINK_SOCK_PATH,
FALLBACK_SINK_SERVER_INFO_FILE_PATH,
)

from pynumaflow.shared.server import NumaflowServer, sync_server_start
Expand Down Expand Up @@ -72,6 +76,12 @@ def udsink_handler(datums: Iterator[Datum]) -> Responses:
grpc_server.start()
"""
# If the container type is fallback sink, then use the fallback sink address and path.
if os.getenv(ENV_UD_CONTAINER_TYPE, "") == UD_CONTAINER_FALLBACK_SINK:
_LOGGER.info("Using Fallback Sink")
sock_path = FALLBACK_SINK_SOCK_PATH
server_info_file = FALLBACK_SINK_SERVER_INFO_FILE_PATH

self.sock_path = f"unix://{sock_path}"
self.max_threads = min(max_threads, int(os.getenv("MAX_THREADS", "4")))
self.max_message_size = max_message_size
Expand All @@ -83,7 +93,6 @@ def udsink_handler(datums: Iterator[Datum]) -> Responses:
("grpc.max_send_message_length", self.max_message_size),
("grpc.max_receive_message_length", self.max_message_size),
]

self.servicer = SyncSinkServicer(sinker_instance)

def start(self):
Expand Down
5 changes: 2 additions & 3 deletions pynumaflow/sinker/servicer/async_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pynumaflow.sinker._dtypes import Responses, Datum, Response
from pynumaflow.sinker._dtypes import SyncSinkCallable
from pynumaflow.proto.sinker import sink_pb2_grpc, sink_pb2
from pynumaflow.sinker.servicer.utils import build_sink_response
from pynumaflow.types import NumaflowServicerContext
from pynumaflow._constants import _LOGGER

Expand Down Expand Up @@ -64,9 +65,7 @@ async def __invoke_sink(self, datum_iterator: AsyncIterable[Datum]):
rspns.append(Response.as_failure(_datum.id, err_msg))
responses = []
for rspn in rspns:
responses.append(
sink_pb2.SinkResponse.Result(id=rspn.id, success=rspn.success, err_msg=rspn.err)
)
responses.append(build_sink_response(rspn))
return responses

async def IsReady(
Expand Down
5 changes: 2 additions & 3 deletions pynumaflow/sinker/servicer/sync_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pynumaflow.sinker._dtypes import Responses, Datum, Response
from pynumaflow.sinker._dtypes import SyncSinkCallable
from pynumaflow.proto.sinker import sink_pb2_grpc, sink_pb2
from pynumaflow.sinker.servicer.utils import build_sink_response
from pynumaflow.types import NumaflowServicerContext


Expand Down Expand Up @@ -54,9 +55,7 @@ def SinkFn(

responses = []
for rspn in rspns:
responses.append(
sink_pb2.SinkResponse.Result(id=rspn.id, success=rspn.success, err_msg=rspn.err)
)
responses.append(build_sink_response(rspn))

return sink_pb2.SinkResponse(results=responses)

Expand Down
13 changes: 13 additions & 0 deletions pynumaflow/sinker/servicer/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from pynumaflow.proto.sinker import sink_pb2
from pynumaflow.sinker._dtypes import Response


def build_sink_response(rspn: Response):
if rspn.success:
return sink_pb2.SinkResponse.Result(id=rspn.id, status=sink_pb2.Status.SUCCESS)
elif rspn.fallback:
return sink_pb2.SinkResponse.Result(id=rspn.id, status=sink_pb2.Status.FALLBACK)
else:
return sink_pb2.SinkResponse.Result(
id=rspn.id, status=sink_pb2.Status.FAILURE, err_msg=rspn.err
)
5 changes: 0 additions & 5 deletions tests/map/test_multiproc_mapper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os
import unittest
from unittest import mock

import grpc
from google.protobuf import empty_pb2 as _empty_pb2
Expand All @@ -19,10 +18,6 @@
)


def mockenv(**envvars):
return mock.patch.dict(os.environ, envvars)


class TestMultiProcMethods(unittest.TestCase):
def setUp(self) -> None:
my_server = MapMultiprocServer(mapper_instance=map_handler)
Expand Down
Loading

0 comments on commit 0c2501b

Please sign in to comment.