Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support middleware #83

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ clean:

.PHONY: docs
docs:
mix docs --open
mix docs -f html --open

.PHONY: changelog
changelog:
Expand Down
8 changes: 3 additions & 5 deletions lib/inngest/function/input.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,15 @@ defmodule Inngest.Function.Context do
"""

defstruct [
:attempt,
:run_id,
:middleware,
# ETS table
:index,
steps: %{}
]

@type t() :: %__MODULE__{
attempt: number(),
run_id: binary(),
index: :ets.tid(),
steps: map()
steps: map(),
middleware: map()
}
end
47 changes: 47 additions & 0 deletions lib/inngest/router/helper.ex → lib/inngest/handler/helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ defmodule Inngest.Router.Helper do
end
end

def load_middleware(params) do
if Config.path_runtime_eval() do
%{middleware: middleware} = load_middleware_from_path(params)
middleware
else
Map.get(params, :middleware, [])
end
end

@spec load_functions_from_path(map()) :: map()
def load_functions_from_path(%{path: paths} = kv) when is_list(paths) do
modules =
Expand All @@ -25,6 +34,28 @@ defmodule Inngest.Router.Helper do
Map.put(kv, :funcs, modules)
end

def load_functions_from_path(%{functions: funcs} = kv) when is_list(funcs) do
modules =
funcs
|> Enum.map(&Path.wildcard/1)
|> List.flatten()
|> Stream.filter(&(!File.dir?(&1)))
|> Enum.uniq()
|> extract_modules()

Map.put(kv, :funcs, modules)
end

def load_functions_from_path(%{functions: funcs} = kv) when is_binary(funcs) do
modules =
funcs
|> Path.wildcard()
|> Enum.filter(&(!File.dir?(&1)))
|> extract_modules()

Map.put(kv, :funcs, modules)
end

def load_functions_from_path(%{path: path} = kv) when is_binary(path) do
modules =
path
Expand All @@ -37,6 +68,22 @@ defmodule Inngest.Router.Helper do

def load_functions_from_path(kv), do: kv

@spec load_middleware_from_path(map()) :: map()
def load_middleware_from_path(%{middleware: middleware} = kv) when is_list(middleware) do
# modules =
# middleware
# |> IO.inspect()
# |> Enum.map(&Path.wildcard/1)
# |> List.flatten()
# |> Stream.filter(&(!File.dir?(&1)))
# |> Enum.uniq()
# |> extract_modules()

Map.put(kv, :middleware, middleware)
end

def load_middleware_from_path(kv), do: kv

defp extract_modules(files) do
files
|> Enum.flat_map(fn file ->
Expand Down
59 changes: 46 additions & 13 deletions lib/inngest/router/invoke.ex → lib/inngest/handler/invoke.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,36 +38,69 @@ defmodule Inngest.Router.Invoke do
%{private: %{raw_body: [body]}} = conn,
%{"event" => event, "events" => events, "ctx" => ctx, "fnId" => fn_slug} = params
) do
input = %Inngest.Function.Input{
attempt: Map.get(ctx, "attempt", 0),
event: Inngest.Event.from(event),
events: Enum.map(events, &Inngest.Event.from/1),
run_id: Map.get(ctx, "run_id"),
step: Inngest.StepTool
}

# prepare steps to be passed into middlewares
steps =
case get_in(params, ["ctx", "stack", "stack"]) do
nil ->
[]

stack ->
Enum.into(stack, [], fn hash ->
data = get_in(params, ["steps", hash])
%{id: hash, data: data}
end)
end

func =
params
|> load_functions()
|> Enum.find(fn func ->
Enum.member?(func.slugs(), fn_slug)
end)

ctx = %Inngest.Function.Context{
attempt: Map.get(ctx, "attempt", 0),
run_id: Map.get(ctx, "run_id"),
steps: Map.get(params, "steps"),
index: :ets.new(:index, [:set, :private])
}
# Initialize middlewares
middleware =
params
|> load_middleware()
|> Enum.into(%{}, fn mid ->
arg = %{input: input, func: func, steps: steps}
opts = mid.init(arg)
{mid.name(), %{opts: opts, mid: mid}}
end)

input = %Inngest.Function.Input{
event: Inngest.Event.from(event),
events: Enum.map(events, &Inngest.Event.from/1),
run_id: Map.get(ctx, "run_id"),
step: Inngest.StepTool
# Transform inputs
steps =
steps
# TODO: Apply each middleware to the step data
# |> Stream.map(fn step ->
# end)
|> Enum.into(%{}, fn %{id: id, data: data} ->
{id, data}
end)

fnctx = %Inngest.Function.Context{
steps: steps,
middleware: middleware,
index: :ets.new(:index, [:set, :private])
}

resp =
case Config.is_dev() do
true ->
invoke(func, ctx, input)
invoke(func, fnctx, input)

false ->
with sig <- conn |> Plug.Conn.get_req_header(Headers.signature()) |> List.first(),
true <- Signature.signing_key_valid?(sig, Config.signing_key(), body) do
invoke(func, ctx, input)
invoke(func, fnctx, input)
else
_ ->
SdkResponse.from_result({:error, "unable to verify signature"}, retry: false)
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions lib/inngest/router/plug.ex → lib/inngest/handler/plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ defmodule Inngest.Router.Plug do
end
|> Enum.into(%{})
|> Inngest.Router.Helper.load_functions_from_path()
|> Inngest.Router.Helper.load_middleware_from_path()
|> Map.put(:framework, @framework)
|> Macro.escape()

Expand Down
File renamed without changes.
63 changes: 63 additions & 0 deletions lib/inngest/middleware.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
defmodule Inngest.Middleware do
@moduledoc """
Inngest Middleware specification
"""

@type opts ::
binary()
| tuple()
| atom()
| integer()
| float()
| [opts]
| map()

@type init_args :: %{
input: Inngest.Function.Input,
func: Inngest.Function,
steps: list(map())
}

@type input_args :: %{
ctx: Inngest.Function.Input,
steps: list(map())
}

@type input_ret :: %{
ctx: Inngest.Function.Input,
steps: list(map())
}

@type output_args :: %{
result: %{data: any()},
step: Inngest.GeneratorOpCode.t() | nil
}

@type output_ret :: %{
result: %{data: any()}
}

@callback name() :: binary()

# NOTE: what to make available on init?
@callback init(init_args) :: opts

@callback transform_input(input_args, opts) :: input_ret

@callback transform_output(output_args, opts) :: output_ret

@callback before_memoization(opts) :: :ok

@callback after_memoization(opts) :: :ok

@callback before_execution(opts) :: :ok

@callback after_execution(opts) :: :ok

@optional_callbacks transform_input: 2,
transform_output: 2,
before_memoization: 1,
after_memoization: 1,
before_execution: 1,
after_execution: 1
end
47 changes: 47 additions & 0 deletions test/support/cases/middleware/test.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
defmodule Inngest.Test.Case.Middleware.Test do
@moduledoc false

@behaviour Inngest.Middleware

@impl true
def init(_), do: []

@impl true
def name(), do: "test"

@impl true
def transform_input(input_args, _opts) do
IO.inspect("Transform input")
input_args
end

@impl true
def transform_output(output_args, _opts) do
IO.inspect("Transform outputx")

Check warning on line 20 in test/support/cases/middleware/test.ex

View workflow job for this annotation

GitHub Actions / Linter

There should be no calls to IO.inspect/1.
output_args
end

@impl true
def before_memoization(_opts) do
IO.inspect("Before memoization")

Check warning on line 26 in test/support/cases/middleware/test.ex

View workflow job for this annotation

GitHub Actions / Linter

There should be no calls to IO.inspect/1.
:ok
end

@impl true
def after_memoization(_opts) do
IO.inspect("After memoization")

Check warning on line 32 in test/support/cases/middleware/test.ex

View workflow job for this annotation

GitHub Actions / Linter

There should be no calls to IO.inspect/1.
:ok
end

@impl true
def before_execution(_opts) do
IO.inspect("Before execution")

Check warning on line 38 in test/support/cases/middleware/test.ex

View workflow job for this annotation

GitHub Actions / Linter

There should be no calls to IO.inspect/1.
:ok
end

@impl true
def after_execution(_opts) do
IO.inspect("After execution")

Check warning on line 44 in test/support/cases/middleware/test.ex

View workflow job for this annotation

GitHub Actions / Linter

There should be no calls to IO.inspect/1.
:ok
end
end
7 changes: 6 additions & 1 deletion test/support/router/plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ defmodule Inngest.Test.PlugRouter do
|> send_resp(200, data)
end

inngest("/api/inngest", path: "test/support/cases/*")
inngest("/api/inngest",
functions: "test/support/cases/*",
middleware: [
Inngest.Test.Case.Middleware.Test
]
)

match _ do
send_resp(conn, 404, "oops\n")
Expand Down
Loading