Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds event bus, support for streaming events and API usage. #1

Merged
merged 1 commit into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/roseflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require_relative "roseflow/version"

require "roseflow/action"
require "roseflow/action/with_events"
require "roseflow/ai/model"
require "roseflow/ai/provider"
require "roseflow/chat/dialogue"
Expand Down
22 changes: 22 additions & 0 deletions lib/roseflow/action/with_events.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

module Roseflow
module Action
# This module is used to define the CLI interface for an interaction
module WithEvents
def self.extended(base_class)
base_class.extend ClassMethods
end

module ClassMethods
def bus=(bus)
@bus = bus
end

def bus
@bus ||= Registry.get(:events)
end
end
end
end
end
10 changes: 8 additions & 2 deletions lib/roseflow/ai/model.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require "roseflow/model_repository"
require "roseflow/provider_repository"
require "roseflow/ai/models/configuration"
require "roseflow/ai/models/instance_factory"
require "roseflow/ai/models/openai_adapter"
require "roseflow/ai/models/openrouter_adapter"
Expand All @@ -11,9 +12,9 @@ module AI
class ModelInstanceNotFoundError < StandardError; end

class Model
attr_reader :name
attr_reader :name, :config

delegate :chat, :embed, to: :instance
delegate :chat, :embed, :config, to: :instance

def initialize(name: nil, provider: nil)
raise ArgumentError, "Name must be provided" if name.nil?
Expand All @@ -22,6 +23,11 @@ def initialize(name: nil, provider: nil)
@name = name
@instance = instance
@_provider = provider
# @config = instance.configuration
end

def config=(config)
@instance.configuration = config if config.is_a?(Models::Configuration)
end

def provider
Expand Down
4 changes: 4 additions & 0 deletions lib/roseflow/ai/model_interface.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ def completion(**options)
def operations
raise NotImplementedError, "Model must implement #operations"
end

def configuration
raise NotImplementedError, "Model must implement #configuration"
end
end
end
end
24 changes: 24 additions & 0 deletions lib/roseflow/ai/models/configuration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

module Roseflow
module AI
module Models
class Configuration < Anyway::Config
config_name :ai_model

attr_config :name
attr_config instrumentation: false
attr_config temperature: 1.0
attr_config top_p: 1.0
attr_config n: 1
attr_config stream: false
attr_config stream_events: false
attr_config stop: nil
attr_config max_tokens: 2048
attr_config presence_penalty: 0.0
attr_config frequency_penalty: 0.0
attr_config user: nil
end
end
end
end
20 changes: 19 additions & 1 deletion lib/roseflow/ai/models/openai_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,24 @@ module Models
class OpenAIAdapter < BaseAdapter
include ModelInterface

def configuration
@configuration ||= Models::Configuration.new(name: @model.name)
end

alias_method :config, :configuration

def configuration=(config)
@configuration = config
end

def call(operation, options, &block)
@model.call(operation, options, &block)
end

def chat(options, &block)
@model.chat(options.delete(:messages), options, &block)
response = @model.chat(options.delete(:messages), options, &block)
publish_api_usage(response.usage) if @configuration.instrumentation
response
end

def embed(options)
Expand All @@ -24,6 +36,12 @@ def embed(options)
def operations
@model.operations
end

private

def publish_api_usage(usage)
Registry.get(:events).publish(:api_usage_event, usage: usage.to_h)
end
end
end
end
Expand Down
9 changes: 9 additions & 0 deletions lib/roseflow/event_bus.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

require "omnes"

module Roseflow
class EventBus
include Omnes
end
end
8 changes: 8 additions & 0 deletions lib/roseflow/events.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

module Roseflow
module Events
end
end

require "roseflow/events/model/streaming_event"
24 changes: 24 additions & 0 deletions lib/roseflow/events/model/streaming_event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

require "omnes"

module Roseflow
module Events
module Model
class StreamingEvent
include Omnes::Event

attr_reader :body, :stream_id

def initialize(body:, stream_id:)
@body = body
@stream_id = stream_id
end

def omnes_event_name
:model_streaming_event
end
end
end
end
end
17 changes: 17 additions & 0 deletions lib/roseflow/events/model_event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

require "omnes"

module Roseflow
module Events
class ModelEvent
include Omnes::Event

def initialize(model:, provider:, data:)
@model = model.name
@provider = provider.name
@data = data
end
end
end
end
6 changes: 6 additions & 0 deletions lib/roseflow/registry.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# frozen_string_literal: true

require "roseflow/event_bus"
require "roseflow/model_repository"
require "roseflow/provider_repository"

module Roseflow
class UnknownRegistryKeyError < StandardError; end

Expand Down Expand Up @@ -38,6 +42,8 @@ def initialize_instance(key)
register(:providers, ProviderRepository.new)
when :models
register(:models, ModelRepository.new)
when :events
register(:events, EventBus.new)
when :default_model
register(:default_model, AI::Model.load("gpt-3.5-turbo"))
end
Expand Down
1 change: 1 addition & 0 deletions roseflow.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Gem::Specification.new do |spec|
spec.add_dependency "google-protobuf", "~> 3.23"
spec.add_dependency "hashie", "~> 5.0"
spec.add_dependency "light-service", "~> 0.18"
spec.add_dependency "omnes"
spec.add_dependency "phlex", "~> 1.8.1"
spec.add_dependency "ulid-ruby", "~> 1.0"

Expand Down
63 changes: 63 additions & 0 deletions spec/ai/model_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module AI
expect(model).to be_a(Model)
expect(model.name).to eq "gpt-3.5-turbo"
expect(model.provider).to eq :openai
expect(model.config).to be_a(Models::Configuration)
end
end

Expand All @@ -26,6 +27,18 @@ module AI
end
end

describe "configuration" do
let(:model) { described_class.new(name: "gpt-3.5-turbo") }

it "has a configuration" do
VCR.use_cassette("ai/model/initialize") do
expect(model).to be_a(Model)
config = model.config
expect(config.max_tokens).to eq 2048
end
end
end

describe "#operations" do
it "returns a list of operations" do
VCR.use_cassette("ai/model/operations") do
Expand All @@ -36,6 +49,56 @@ module AI
end
end
end

describe "SSE events" do
before do
Registry.get(:events).register(:model_streaming_event)
end

let(:model) { Registry.get(:models).find("gpt-3.5-turbo") }
let(:config) { Models::Configuration.new(stream_events: true) }
let(:messages) {
[
Roseflow::Chat::Message.new(role: "system", content: "Count from 1 to 5.")
]
}
let(:subscriber) { TestDoubles::StreamingEventSubscriber.new }

it "publishes streaming SSE events" do
subscriber.subscribe_to(Registry.get(:events))

VCR.use_cassette("ai/model/streaming", record: :new_episodes) do
model.config = config
expect(model.config.stream_events).to be_truthy
expect(model.chat(model: model.name, messages: messages, stream: true, stream_events: true)).to be_success
end
end
end

describe "API usage" do
before do
Registry.get(:events).register(:api_usage_event)
end

let(:model) { Registry.get(:models).find("gpt-3.5-turbo") }
let(:config) { Models::Configuration.new(instrumentation: true) }
let(:messages) {
[
Roseflow::Chat::Message.new(role: "system", content: "Count from 1 to 5.")
]
}
let(:subscriber) { TestDoubles::ApiUsageEventSubscriber.new }

it "publishes API usage" do
subscriber.subscribe_to(Registry.get(:events))

VCR.use_cassette("ai/model/api_usage") do
model.config = config
expect(model.config.instrumentation).to be_truthy
expect(model.chat(model: model.name, messages: messages)).to be_success
end
end
end
end
end # AI
end # Roseflow
1 change: 1 addition & 0 deletions spec/embeddings/embedding_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ module Embeddings
describe "#call" do
subject(:no_input) { described_class.new }
subject(:no_model) { described_class.new(input: "test") }

let(:model) { Registry.get(:models).find("text-embedding-ada-002") }

subject { described_class.new(input: "test", model: model) }
Expand Down
33 changes: 33 additions & 0 deletions spec/event_bus_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# frozen_string_literal: true

require "spec_helper"
require "roseflow/event_bus"

module Roseflow
RSpec.describe EventBus do
subject { described_class.new }

it "does not share buses between instances" do
klass = described_class.new

expect(klass.omnes_bus).not_to be(subject.omnes_bus)
end

describe "events" do
describe "from actions" do
before do
Registry.get(:events).register(:action_event)
end

let(:action) { TestDoubles::EventedAction.execute }
let(:subscriber) { TestDoubles::ActionEventSubscriber.new }

it "receives events from actions" do
expect(subscriber).to receive(:handler)
subscriber.subscribe_to(Registry.get(:events))
expect(action).to be_success
end
end
end
end
end
Loading
Loading