Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions agency.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ type OperationConfig struct {
Messages []Message
}

func (p *Operation) Config() *OperationConfig {
return p.config
}

// NewOperation allows to create an operation from a function.
func NewOperation(handler OperationHandler) *Operation {
return &Operation{
Expand Down
2 changes: 1 addition & 1 deletion examples/custom_operation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func main() {
increment := agency.NewOperation(incrementFunc)

msg, err := agency.NewProcess(
msg, err := agency.ProcessFromOperations(
increment, increment, increment,
).Execute(context.Background(), agency.UserMessage("0"))

Expand Down
4 changes: 2 additions & 2 deletions examples/logging/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func main() {
factory := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")})
params := openai.TextToTextParams{Model: "gpt-3.5-turbo"}

_, err := agency.NewProcess(
_, err := agency.ProcessFromOperations(
factory.TextToText(params).SetPrompt("explain what that means"),
factory.TextToText(params).SetPrompt("translate to russian"),
factory.TextToText(params).SetPrompt("replace all spaces with '_'"),
Expand All @@ -31,6 +31,6 @@ func main() {
}
}

func Logger(input, output agency.Message, cfg *agency.OperationConfig) {
func Logger(input, output agency.Message, cfg *agency.OperationConfig, _ uint) {
Copy link
Contributor Author

@emil14 emil14 Dec 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Pass history here?
  2. Combine all arguments into single structure?

P.S. - last parameter is step-index, interface of the interceptor was changed and interceptor was renamed to observer

fmt.Printf("in: %v\nprompt: %v\nout: %v\n\n", input, cfg.Prompt, output)
}
59 changes: 59 additions & 0 deletions examples/process_history/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package main

import (
"context"
"fmt"

_ "github.com/joho/godotenv/autoload"

"github.com/neurocult/agency"
"github.com/neurocult/agency/providers/openai"
)

func main() {
provider := openai.New(openai.Params{Key: "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"})
params := openai.TextToTextParams{
Model: "gpt-3.5-turbo",
Temperature: openai.Temperature(0),
MaxTokens: 100,
}

result, err := agency.NewProcess(
agency.ProcessStep{
Operation: provider.
TextToText(params).
SetPrompt("Increase the number by adding 1 to it. Answer only in numbers, without text"),
},
agency.ProcessStep{
Operation: provider.
TextToText(params).
SetPrompt("Double the number. Answer only in numbers, without text"),
},
agency.ProcessStep{
ConfigFunc: func(history agency.ProcessHistory, cfg *agency.OperationConfig) error {
firstStepResult, _ := history.Get(0)
cfg.Prompt = fmt.Sprintf("Add %s", firstStepResult)
return nil
},
Operation: provider.TextToText(params),
},
).Execute(
context.Background(),
agency.UserMessage("5"),
func(in, out agency.Message, cfg *agency.OperationConfig, stepIndex uint) {
fmt.Printf("---\n\nSTEP %d executed\n\nINPUT: %v\n\nCONFIG: %v\n\nOUTPUT: %v\n\n", stepIndex, in, cfg, out)
},
)

if err != nil {
panic(err)
}

fmt.Println(result)
}

// InjectHistory allows to pass history between operations by injecting it into the config.
func InjectHistory(history agency.ProcessHistory, cfg *agency.OperationConfig) error {
cfg.Messages = history.All()
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,31 @@ import (

type Saver []agency.Message

func (s *Saver) Save(input, output agency.Message, _ *agency.OperationConfig) {
// This is how we can retrieve process history by hand with the interceptor, without using the history itself.
// But we can't (or it's hard to do) pass history between steps this way. For that we can use config func.
func (s *Saver) Save(input, output agency.Message, _ *agency.OperationConfig, _ uint) {
*s = append(*s, output)
}

func main() {
factory := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")})
provider := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")})

// step 1
hear := factory.
hear := provider.
SpeechToText(openai.SpeechToTextParams{
Model: goopenai.Whisper1,
})

// step2
translate := factory.
translate := provider.
TextToText(openai.TextToTextParams{
Model: "gpt-3.5-turbo",
Temperature: openai.Temperature(0.5),
}).
SetPrompt("translate to russian")

// step 3
uppercase := factory.
uppercase := provider.
TextToText(openai.TextToTextParams{
Model: "gpt-3.5-turbo",
Temperature: openai.Temperature(1),
Expand All @@ -54,7 +56,7 @@ func main() {
ctx := context.Background()
speechMsg := agency.Message{Content: sound}

_, err = agency.NewProcess(
_, err = agency.ProcessFromOperations(
hear,
translate,
uppercase,
Expand Down
2 changes: 1 addition & 1 deletion examples/rag_vector_database/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func main() {
Model: "tts-1", ResponseFormat: "mp3", Speed: 1, Voice: "onyx",
})

result, err := agency.NewProcess(
result, err := agency.ProcessFromOperations(
retrieve,
summarize,
voice,
Expand Down
4 changes: 2 additions & 2 deletions examples/speech_to_text/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (
)

func main() {
factory := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")})
provider := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")})

data, err := os.ReadFile("speech.mp3")
if err != nil {
panic(err)
}

result, err := factory.SpeechToText(openai.SpeechToTextParams{
result, err := provider.SpeechToText(openai.SpeechToTextParams{
Model: goopenai.Whisper1,
}).Execute(
context.Background(),
Expand Down
2 changes: 1 addition & 1 deletion examples/speech_to_text_to_image/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func main() {
panic(err)
}

msg, err := agency.NewProcess(
msg, err := agency.ProcessFromOperations(
factory.SpeechToText(openai.SpeechToTextParams{Model: goopenai.Whisper1}),
factory.TextToImage(openai.TextToImageParams{
Model: goopenai.CreateImageModelDallE2,
Expand Down
4 changes: 2 additions & 2 deletions examples/translate_text/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
)

func main() {
factory := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")})
provider := openai.New(openai.Params{Key: os.Getenv("OPENAI_API_KEY")})

result, err := factory.
result, err := provider.
TextToText(openai.TextToTextParams{Model: goopenai.GPT3Dot5Turbo}).
SetPrompt("You are a helpful assistant that translates English to French").
Execute(context.Background(), agency.UserMessage("I love programming."))
Expand Down
81 changes: 67 additions & 14 deletions process.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,88 @@ package agency

import (
"context"
"errors"
"fmt"
)

// Process is a chain of operations that can be executed in sequence.
// Process is a sequential chain of steps operations that can be executed in sequence.
type Process struct {
operations []*Operation
steps []ProcessStep
}

func NewProcess(operations ...*Operation) *Process {
return &Process{
operations: operations,
// ProcessStep is an object that can be chained with other steps forming the process.
type ProcessStep struct {
// Operation that current step depends on.
// It's execution is deferred until the process reaches the corresponding step.
Operation *Operation
// ConfigFunc allows to modify config based a on results from the previous steps.
// It's execution is deferred until the process reaches the corresponding step.
ConfigFunc func(ProcessHistory, *OperationConfig) error
}

// NewProcess creates new process based on a given steps. If you don't need history use ProcessFromOperations instead.
func NewProcess(steps ...ProcessStep) *Process {
return &Process{steps: steps}
}

// ProcessFromOperations allows to create process from operations.
// It's handy when all you need is to chain some operations together and you don't want to have an access to history.
func ProcessFromOperations(operations ...*Operation) *Process {
steps := make([]ProcessStep, 0, len(operations))
for _, operation := range operations {
steps = append(steps, ProcessStep{Operation: operation, ConfigFunc: nil})
}
return &Process{steps: steps}
}

// ProcessInterceptor is a function that is called by Process after one step finished but before next one is started.
type ProcessInterceptor func(in Message, out Message, cfg *OperationConfig, stepIndex uint)

// ProcessHistory stores results of the previous steps of the process. It's a process's execution context.
type ProcessHistory interface {
Get(stepIndex uint) (Message, error) // Get takes index (starts from zero) of the step which result we want to get
All() []Message // All allows to retrieve all the history of the previously processed steps
}

// processHistory implements ProcessHistory interfaces via simple slice of messages
type processHistory []Message

// Get is a panic-free way to get a message by index of the step. Indexes starts with zero. Index must be < steps count
func (p processHistory) Get(stepIndex uint) (Message, error) {
i := int(stepIndex)
if i >= len(p) {
return Message{}, errors.New("step index must less than the number of steps")
}
return p[i], nil
}

// All simply returns p as it is.
func (p processHistory) All() []Message {
return p
}

// Interceptor is a function that is called by Process after one operation finished but before next one is started.
type Interceptor func(in Message, out Message, cfg *OperationConfig)
// Execute loops over process steps and sequentially executes them by passing output of one step as an input to another.
// If interceptors are provided, they are called on each step. So for N steps and M interceptors there's N x M executions.
func (p *Process) Execute(ctx context.Context, input Message, interceptors ...ProcessInterceptor) (Message, error) {
history := make(processHistory, 0, len(p.steps))

for i, step := range p.steps {
if step.ConfigFunc != nil {
if err := step.ConfigFunc(history, step.Operation.config); err != nil {
return Message{}, fmt.Errorf("config func on step %d: %w", i, err)
}
}

// Execute iterates over Process's operations and sequentially executes them.
// After first operation is executed it uses its output as an input to the second one and so on until the whole chain is finished.
// It also executes all given interceptors, if they are provided, so for every N operations and M interceptors it's N x M executions.
func (p *Process) Execute(ctx context.Context, input Message, interceptors ...Interceptor) (Message, error) {
for _, operation := range p.operations {
output, err := operation.Execute(ctx, input)
output, err := step.Operation.Execute(ctx, input)
if err != nil {
return Message{}, err
}

history = append(history, output)

// FIXME while these are called AFTER operation and not before it's impossible to modify configuration
for _, interceptor := range interceptors {
interceptor(input, output, operation.Config())
interceptor(input, output, step.Operation.config, uint(i))
}

input = output
Expand Down