Skip to content

Commit

Permalink
Merge pull request #1 from roseflow-ai/omnes
Browse files Browse the repository at this point in the history
Adds event bus, support for streaming events and API usage.
  • Loading branch information
ljuti committed Aug 10, 2023
2 parents e5770f7 + 43c7b68 commit 5b1a16d
Show file tree
Hide file tree
Showing 19 changed files with 472 additions and 3 deletions.
1 change: 1 addition & 0 deletions lib/roseflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require_relative "roseflow/version"

require "roseflow/action"
require "roseflow/action/with_events"
require "roseflow/ai/model"
require "roseflow/ai/provider"
require "roseflow/chat/dialogue"
Expand Down
22 changes: 22 additions & 0 deletions lib/roseflow/action/with_events.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

module Roseflow
module Action
# This module is used to define the CLI interface for an interaction
module WithEvents
def self.extended(base_class)
base_class.extend ClassMethods
end

module ClassMethods
def bus=(bus)
@bus = bus
end

def bus
@bus ||= Registry.get(:events)
end
end
end
end
end
10 changes: 8 additions & 2 deletions lib/roseflow/ai/model.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require "roseflow/model_repository"
require "roseflow/provider_repository"
require "roseflow/ai/models/configuration"
require "roseflow/ai/models/instance_factory"
require "roseflow/ai/models/openai_adapter"
require "roseflow/ai/models/openrouter_adapter"
Expand All @@ -11,9 +12,9 @@ module AI
class ModelInstanceNotFoundError < StandardError; end

class Model
attr_reader :name
attr_reader :name, :config

delegate :chat, :embed, to: :instance
delegate :chat, :embed, :config, to: :instance

def initialize(name: nil, provider: nil)
raise ArgumentError, "Name must be provided" if name.nil?
Expand All @@ -22,6 +23,11 @@ def initialize(name: nil, provider: nil)
@name = name
@instance = instance
@_provider = provider
# @config = instance.configuration
end

def config=(config)
@instance.configuration = config if config.is_a?(Models::Configuration)
end

def provider
Expand Down
4 changes: 4 additions & 0 deletions lib/roseflow/ai/model_interface.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ def completion(**options)
def operations
raise NotImplementedError, "Model must implement #operations"
end

def configuration
raise NotImplementedError, "Model must implement #configuration"
end
end
end
end
24 changes: 24 additions & 0 deletions lib/roseflow/ai/models/configuration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

module Roseflow
module AI
module Models
class Configuration < Anyway::Config
config_name :ai_model

attr_config :name
attr_config instrumentation: false
attr_config temperature: 1.0
attr_config top_p: 1.0
attr_config n: 1
attr_config stream: false
attr_config stream_events: false
attr_config stop: nil
attr_config max_tokens: 2048
attr_config presence_penalty: 0.0
attr_config frequency_penalty: 0.0
attr_config user: nil
end
end
end
end
20 changes: 19 additions & 1 deletion lib/roseflow/ai/models/openai_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,24 @@ module Models
class OpenAIAdapter < BaseAdapter
include ModelInterface

def configuration
@configuration ||= Models::Configuration.new(name: @model.name)
end

alias_method :config, :configuration

def configuration=(config)
@configuration = config
end

def call(operation, options, &block)
@model.call(operation, options, &block)
end

def chat(options, &block)
@model.chat(options.delete(:messages), options, &block)
response = @model.chat(options.delete(:messages), options, &block)
publish_api_usage(response.usage) if @configuration.instrumentation
response
end

def embed(options)
Expand All @@ -24,6 +36,12 @@ def embed(options)
def operations
@model.operations
end

private

def publish_api_usage(usage)
Registry.get(:events).publish(:api_usage_event, usage: usage.to_h)
end
end
end
end
Expand Down
9 changes: 9 additions & 0 deletions lib/roseflow/event_bus.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

require "omnes"

module Roseflow
class EventBus
include Omnes
end
end
8 changes: 8 additions & 0 deletions lib/roseflow/events.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

module Roseflow
module Events
end
end

require "roseflow/events/model/streaming_event"
24 changes: 24 additions & 0 deletions lib/roseflow/events/model/streaming_event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

require "omnes"

module Roseflow
module Events
module Model
class StreamingEvent
include Omnes::Event

attr_reader :body, :stream_id

def initialize(body:, stream_id:)
@body = body
@stream_id = stream_id
end

def omnes_event_name
:model_streaming_event
end
end
end
end
end
17 changes: 17 additions & 0 deletions lib/roseflow/events/model_event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

require "omnes"

module Roseflow
module Events
class ModelEvent
include Omnes::Event

def initialize(model:, provider:, data:)
@model = model.name
@provider = provider.name
@data = data
end
end
end
end
6 changes: 6 additions & 0 deletions lib/roseflow/registry.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# frozen_string_literal: true

require "roseflow/event_bus"
require "roseflow/model_repository"
require "roseflow/provider_repository"

module Roseflow
class UnknownRegistryKeyError < StandardError; end

Expand Down Expand Up @@ -38,6 +42,8 @@ def initialize_instance(key)
register(:providers, ProviderRepository.new)
when :models
register(:models, ModelRepository.new)
when :events
register(:events, EventBus.new)
when :default_model
register(:default_model, AI::Model.load("gpt-3.5-turbo"))
end
Expand Down
1 change: 1 addition & 0 deletions roseflow.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Gem::Specification.new do |spec|
spec.add_dependency "google-protobuf", "~> 3.23"
spec.add_dependency "hashie", "~> 5.0"
spec.add_dependency "light-service", "~> 0.18"
spec.add_dependency "omnes"
spec.add_dependency "phlex", "~> 1.8.1"
spec.add_dependency "ulid-ruby", "~> 1.0"

Expand Down
63 changes: 63 additions & 0 deletions spec/ai/model_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module AI
expect(model).to be_a(Model)
expect(model.name).to eq "gpt-3.5-turbo"
expect(model.provider).to eq :openai
expect(model.config).to be_a(Models::Configuration)
end
end

Expand All @@ -26,6 +27,18 @@ module AI
end
end

describe "configuration" do
let(:model) { described_class.new(name: "gpt-3.5-turbo") }

it "has a configuration" do
VCR.use_cassette("ai/model/initialize") do
expect(model).to be_a(Model)
config = model.config
expect(config.max_tokens).to eq 2048
end
end
end

describe "#operations" do
it "returns a list of operations" do
VCR.use_cassette("ai/model/operations") do
Expand All @@ -36,6 +49,56 @@ module AI
end
end
end

describe "SSE events" do
before do
Registry.get(:events).register(:model_streaming_event)
end

let(:model) { Registry.get(:models).find("gpt-3.5-turbo") }
let(:config) { Models::Configuration.new(stream_events: true) }
let(:messages) {
[
Roseflow::Chat::Message.new(role: "system", content: "Count from 1 to 5.")
]
}
let(:subscriber) { TestDoubles::StreamingEventSubscriber.new }

it "publishes streaming SSE events" do
subscriber.subscribe_to(Registry.get(:events))

VCR.use_cassette("ai/model/streaming", record: :new_episodes) do
model.config = config
expect(model.config.stream_events).to be_truthy
expect(model.chat(model: model.name, messages: messages, stream: true, stream_events: true)).to be_success
end
end
end

describe "API usage" do
before do
Registry.get(:events).register(:api_usage_event)
end

let(:model) { Registry.get(:models).find("gpt-3.5-turbo") }
let(:config) { Models::Configuration.new(instrumentation: true) }
let(:messages) {
[
Roseflow::Chat::Message.new(role: "system", content: "Count from 1 to 5.")
]
}
let(:subscriber) { TestDoubles::ApiUsageEventSubscriber.new }

it "publishes API usage" do
subscriber.subscribe_to(Registry.get(:events))

VCR.use_cassette("ai/model/api_usage") do
model.config = config
expect(model.config.instrumentation).to be_truthy
expect(model.chat(model: model.name, messages: messages)).to be_success
end
end
end
end
end # AI
end # Roseflow
1 change: 1 addition & 0 deletions spec/embeddings/embedding_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ module Embeddings
describe "#call" do
subject(:no_input) { described_class.new }
subject(:no_model) { described_class.new(input: "test") }

let(:model) { Registry.get(:models).find("text-embedding-ada-002") }

subject { described_class.new(input: "test", model: model) }
Expand Down
33 changes: 33 additions & 0 deletions spec/event_bus_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# frozen_string_literal: true

require "spec_helper"
require "roseflow/event_bus"

module Roseflow
RSpec.describe EventBus do
subject { described_class.new }

it "does not share buses between instances" do
klass = described_class.new

expect(klass.omnes_bus).not_to be(subject.omnes_bus)
end

describe "events" do
describe "from actions" do
before do
Registry.get(:events).register(:action_event)
end

let(:action) { TestDoubles::EventedAction.execute }
let(:subscriber) { TestDoubles::ActionEventSubscriber.new }

it "receives events from actions" do
expect(subscriber).to receive(:handler)
subscriber.subscribe_to(Registry.get(:events))
expect(action).to be_success
end
end
end
end
end
Loading

0 comments on commit 5b1a16d

Please sign in to comment.