diff --git a/core/config/meta/registry.go b/core/config/meta/registry.go index a1cfe4c9aabd..3476076e1442 100644 --- a/core/config/meta/registry.go +++ b/core/config/meta/registry.go @@ -537,6 +537,36 @@ func DefaultRegistry() map[string]FieldMetaOverride { Component: "number", Order: 79, }, + "pipeline.compaction.enabled": { + Section: "pipeline", + Label: "Compaction Enabled", + Description: "Fold conversation items that age out of the live window (Max History Items) into a rolling summary instead of dropping them, so long realtime sessions stay cheap without losing earlier context. Off by default.", + Component: "toggle", + Order: 80, + }, + "pipeline.compaction.trigger_items": { + Section: "pipeline", + Label: "Compaction Trigger Items", + Description: "High-water mark: once the live conversation exceeds this many items, the overflow above Max History Items is summarized and evicted. Must be greater than Max History Items; defaults to twice it. The gap controls how often summarization runs.", + Component: "number", + Order: 81, + }, + "pipeline.compaction.summary_model": { + Section: "pipeline", + Label: "Compaction Summary Model", + Description: "Optional smaller/cheaper model used to produce the rolling summary. Empty reuses the pipeline's own LLM. On CPU, a tiny model here keeps compaction from competing with the conversation LLM.", + Component: "input", + Advanced: true, + Order: 82, + }, + "pipeline.compaction.max_summary_tokens": { + Section: "pipeline", + Label: "Compaction Max Summary Tokens", + Description: "Advisory cap on the rolling summary length (fed to the summarizer prompt). Defaults to 512.", + Component: "number", + Advanced: true, + Order: 83, + }, // --- Functions --- "function.grammar.parallel_calls": { diff --git a/core/config/model_config.go b/core/config/model_config.go index cbb33683838b..8886ddfd5a5d 100644 --- a/core/config/model_config.go +++ b/core/config/model_config.go @@ -641,11 +641,32 @@ type Pipeline struct { // context fills. MaxHistoryItems *int `yaml:"max_history_items,omitempty" json:"max_history_items,omitempty"` + // Compaction folds conversation items that age out of the live window + // (max_history_items) into a rolling summary instead of dropping them, so + // long realtime sessions stay cheap without losing earlier context. Nil + // (block absent) means disabled, preserving existing behavior. + Compaction *PipelineCompaction `yaml:"compaction,omitempty" json:"compaction,omitempty"` + // VoiceRecognition gates the pipeline behind speaker verification. Nil // (block absent) means no gate, preserving existing behavior. VoiceRecognition *PipelineVoiceRecognition `yaml:"voice_recognition,omitempty" json:"voice_recognition,omitempty"` } +// PipelineCompaction configures summarize-then-drop for a realtime pipeline. +type PipelineCompaction struct { + // Enabled turns summarize-then-drop on. Default false. + Enabled bool `yaml:"enabled,omitempty" json:"enabled,omitempty"` + // TriggerItems is the high-water mark: once live items exceed it, overflow + // above max_history_items is summarized and evicted. Must exceed + // max_history_items; clamped up if not. Default: 2x max_history_items. + TriggerItems int `yaml:"trigger_items,omitempty" json:"trigger_items,omitempty"` + // SummaryModel optionally names a smaller/cheaper model for the summary + // call. Empty uses the pipeline's own LLM. + SummaryModel string `yaml:"summary_model,omitempty" json:"summary_model,omitempty"` + // MaxSummaryTokens advises the summary length (fed to the prompt). Default 512. + MaxSummaryTokens int `yaml:"max_summary_tokens,omitempty" json:"max_summary_tokens,omitempty"` +} + // ApplyReasoningEffort resolves the effective reasoning effort — a per-request // value (requestEffort) overrides the config's own ReasoningEffort default — // stores it on the config so gRPCPredictOpts forwards it to the backend as the diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index 1af4c6b75dee..d4d6a0ac40de 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -12,6 +12,7 @@ import ( "os" "strconv" "sync" + "sync/atomic" "time" "net/http" @@ -134,6 +135,18 @@ type Session struct { // pairs are kept together so we never feed an orphaned tool result. MaxHistoryItems int + // Compaction settings resolved from pipeline.compaction (see resolveCompaction). + CompactionEnabled bool + CompactionTrigger int + SummaryModel string + MaxSummaryTokens int + + // summarizerFactory lazily builds the model used for compaction summaries + // when summary_model is configured; nil means reuse the pipeline LLM. + summarizerFactory func() (Model, error) + summarizerOnce sync.Once + summarizerCached Model + // AssistantExecutor is non-nil when the session opted into the in-process // LocalAI Assistant tool surface. Tool calls whose name matches this // executor's catalog are run inproc and their output is fed back to the @@ -241,6 +254,12 @@ type Conversation struct { ID string Items []*types.MessageItemUnion Lock sync.Mutex + // Memory is the rolling summary of items already evicted by compaction. It + // is kept out of Items (so trimRealtimeItems never drops it) and rendered + // as a system message right after the session instructions. + Memory string + // compacting ensures at most one background compaction runs per conversation. + compacting atomic.Bool } func (c *Conversation) ToServer() types.Conversation { @@ -540,13 +559,12 @@ func runRealtimeSession(application *application.Application, t Transport, model SoundDetectionWindowMs: cfg.Pipeline.SoundDetectionWindowMs, SoundDetectionHopMs: cfg.Pipeline.SoundDetectionHopMs, } + session.CompactionEnabled, session.CompactionTrigger, session.MaxSummaryTokens, session.SummaryModel = resolveCompaction(cfg, session.MaxHistoryItems) // Create a default conversation conversationID := generateConversationID() conversation := &Conversation{ - ID: conversationID, - // TODO: We need to truncate the conversation items when a new item is added and we have run out of space. There are multiple places where items - // can be added so we could use a datastructure here that enforces truncation upon addition + ID: conversationID, Items: []*types.MessageItemUnion{}, } session.Conversations[conversationID] = conversation @@ -577,6 +595,18 @@ func runRealtimeSession(application *application.Application, t Transport, model } session.ModelInterface = m + if session.SummaryModel != "" { + summaryModelName := session.SummaryModel + sid := sessionID + session.summarizerFactory = func() (Model, error) { + summaryCfg, lerr := application.ModelConfigLoader().LoadModelConfigFileByNameDefaultOptions(summaryModelName, application.ApplicationConfig()) + if lerr != nil { + return nil, fmt.Errorf("load summary model config %q: %w", summaryModelName, lerr) + } + return newModel(&summaryCfg.Pipeline, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), evaluator, buildRealtimeRoutingContext(application, sid)) + } + } + if cfg.Pipeline.VoiceGateEnabled() { gate, gerr := newVoiceGate( *cfg.Pipeline.VoiceRecognition, @@ -807,6 +837,15 @@ func runRealtimeSession(application *application.Application, t Transport, model commitUtterance(respCtx, allAudio, session, conversation, t) }() + case types.InputAudioBufferClearEvent: + xlog.Debug("recv", "message", string(msg)) + // Discard a partially-captured utterance so the client can restart + // input cleanly without the stale buffer leaking into the next commit. + clearInputAudio(session) + sendEvent(t, types.InputAudioBufferClearedEvent{ + ServerEventBase: types.ServerEventBase{EventID: e.EventID}, + }) + case types.ConversationItemCreateEvent: xlog.Debug("recv", "message", string(msg)) // Add the item to the conversation @@ -841,7 +880,39 @@ func runRealtimeSession(application *application.Application, t Transport, model }) case types.ConversationItemDeleteEvent: - sendError(t, "not_implemented", "Deleting items not implemented", "", "event_TODO") + xlog.Debug("recv", "message", string(msg)) + if e.ItemID == "" { + sendError(t, "invalid_item_id", "Need item_id, but none specified", "", "event_TODO") + continue + } + conversation.Lock.Lock() + updated, ok := deleteItem(conversation.Items, e.ItemID) + conversation.Items = updated + conversation.Lock.Unlock() + if !ok { + sendError(t, "invalid_item_id", "Item to delete not found", "", "event_TODO") + continue + } + sendEvent(t, types.ConversationItemDeletedEvent{ + ServerEventBase: types.ServerEventBase{EventID: e.EventID}, + ItemID: e.ItemID, + }) + + case types.ConversationItemTruncateEvent: + xlog.Debug("recv", "message", string(msg)) + conversation.Lock.Lock() + ok := truncateAssistantText(conversation.Items, e.ItemID, e.ContentIndex) + conversation.Lock.Unlock() + if !ok { + sendError(t, "invalid_item_id", "Item to truncate not found", "", "event_TODO") + continue + } + sendEvent(t, types.ConversationItemTruncatedEvent{ + ServerEventBase: types.ServerEventBase{EventID: e.EventID}, + ItemID: e.ItemID, + ContentIndex: e.ContentIndex, + AudioEndMs: e.AudioEndMs, + }) case types.ConversationItemRetrieveEvent: xlog.Debug("recv", "message", string(msg)) @@ -854,21 +925,7 @@ func runRealtimeSession(application *application.Application, t Transport, model conversation.Lock.Lock() var retrievedItem types.MessageItemUnion for _, item := range conversation.Items { - // We need to check ID in the union - var id string - if item.System != nil { - id = item.System.ID - } else if item.User != nil { - id = item.User.ID - } else if item.Assistant != nil { - id = item.Assistant.ID - } else if item.FunctionCall != nil { - id = item.FunctionCall.ID - } else if item.FunctionCallOutput != nil { - id = item.FunctionCallOutput.ID - } - - if id == e.ItemID { + if itemID(item) == e.ItemID { retrievedItem = *item break } @@ -1666,6 +1723,9 @@ const maxAssistantToolTurns = 10 func triggerResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, overrides *types.ResponseCreateParams) { triggerResponseAtTurn(ctx, session, conv, t, overrides, 0) + // Fold aged-out turns into the rolling memory off the critical path; the + // next turn reaps the smaller buffer. + session.maybeCompact(conv) } func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversation, t Transport, overrides *types.ResponseCreateParams, toolTurn int) { @@ -1721,6 +1781,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa var lastUserSpeaker *types.Speaker personalize := session.voiceGate != nil && session.voiceGate.cfg.PersonalizeEnabled() conv.Lock.Lock() + conversationHistory = withMemory(conversationHistory, conv.Memory) items := trimRealtimeItems(conv.Items, session.MaxHistoryItems) for _, item := range items { if item.User != nil { diff --git a/core/http/endpoints/openai/realtime_compaction.go b/core/http/endpoints/openai/realtime_compaction.go new file mode 100644 index 000000000000..f79a2d7a240c --- /dev/null +++ b/core/http/endpoints/openai/realtime_compaction.go @@ -0,0 +1,326 @@ +package openai + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/pkg/reasoning" + "github.com/mudler/xlog" +) + +const ( + defaultMaxSummaryTokens = 512 + memoryPrefix = "Summary of earlier conversation:\n" + // compactionTimeout bounds the summarizer call so a stuck model can't pin the + // compacting flag (and thus block all further compaction) forever. + compactionTimeout = 60 * time.Second +) + +// withMemory inserts the rolling summary as a system message after the existing +// (instructions) history. No-op when memory is empty. +func withMemory(history schema.Messages, memory string) schema.Messages { + if memory == "" { + return history + } + content := memoryPrefix + memory + return append(history, schema.Message{ + Role: string(types.MessageRoleSystem), + StringContent: content, + Content: content, + }) +} + +// renderItemsTranscript renders conversation items as a plain "role: text" +// transcript for summarization. Non-text items (bare tool calls) are labelled +// so the summarizer keeps track of actions taken. +func renderItemsTranscript(items []*types.MessageItemUnion) string { + var b strings.Builder + for _, item := range items { + switch { + case item.User != nil: + b.WriteString("user: ") + for _, c := range item.User.Content { + if c.Text != "" { + b.WriteString(c.Text) + } + if c.Transcript != "" { + b.WriteString(c.Transcript) + } + } + b.WriteString("\n") + case item.Assistant != nil: + b.WriteString("assistant: ") + // Realtime assistant *audio* turns store the spoken words in + // .Transcript (not .Text), so emit both or spoken turns are dropped. + for _, c := range item.Assistant.Content { + if c.Text != "" { + b.WriteString(c.Text) + } + if c.Transcript != "" { + b.WriteString(c.Transcript) + } + } + b.WriteString("\n") + case item.FunctionCall != nil: + b.WriteString(fmt.Sprintf("assistant called tool %s(%s)\n", item.FunctionCall.Name, item.FunctionCall.Arguments)) + case item.FunctionCallOutput != nil: + b.WriteString(fmt.Sprintf("tool result: %s\n", item.FunctionCallOutput.Output)) + } + } + return strings.TrimSpace(b.String()) +} + +// buildSummaryMessages builds the chat messages for the summarizer LLM: a system +// instruction plus prior memory and the new transcript to fold in. maxTokens is +// advisory (fed to the prompt; not hard-enforced in v1). +func buildSummaryMessages(priorMemory, transcript string, maxTokens int) schema.Messages { + system := fmt.Sprintf("You maintain a running memory of a live voice conversation. "+ + "Merge the prior memory with the new exchanges into an updated memory. "+ + "Keep names, decisions, facts, preferences, and open threads. Be concise "+ + "(under ~%d tokens). Output only the updated memory, with no reasoning or tags.", maxTokens) + var user strings.Builder + if priorMemory != "" { + user.WriteString("Prior memory:\n") + user.WriteString(priorMemory) + user.WriteString("\n\n") + } + user.WriteString("New exchanges to fold in:\n") + user.WriteString(transcript) + return schema.Messages{ + {Role: string(types.MessageRoleSystem), StringContent: system, Content: system}, + {Role: string(types.MessageRoleUser), StringContent: user.String(), Content: user.String()}, + } +} + +// clearInputAudio resets the session's pending input audio buffer (the raw +// PCM and any buffered Opus frames). Used by the input_audio_buffer.clear +// realtime event so a client can discard a partially-captured utterance. +func clearInputAudio(s *Session) { + s.AudioBufferLock.Lock() + s.InputAudioBuffer = nil + s.AudioBufferLock.Unlock() + s.OpusFramesLock.Lock() + s.OpusFrames = nil + s.OpusFramesLock.Unlock() +} + +// itemID extracts the id from any MessageItemUnion variant ("" if none). +func itemID(item *types.MessageItemUnion) string { + switch { + case item == nil: + return "" + case item.System != nil: + return item.System.ID + case item.User != nil: + return item.User.ID + case item.Assistant != nil: + return item.Assistant.ID + case item.FunctionCall != nil: + return item.FunctionCall.ID + case item.FunctionCallOutput != nil: + return item.FunctionCallOutput.ID + default: + return "" + } +} + +// deleteItem removes the item with id from items, returning the new slice and +// whether it was found. +func deleteItem(items []*types.MessageItemUnion, id string) ([]*types.MessageItemUnion, bool) { + for i, item := range items { + if itemID(item) == id { + return append(items[:i:i], items[i+1:]...), true + } + } + return items, false +} + +// truncateAssistantText clears the text of the assistant item's content part at +// contentIndex. Minimal truncate: used to discard an interrupted/barge-in +// response tail. Both .Text and .Transcript are cleared because realtime audio +// turns store the spoken words in .Transcript (clearing only .Text would no-op). +func truncateAssistantText(items []*types.MessageItemUnion, id string, contentIndex int) bool { + for _, item := range items { + if itemID(item) != id || item.Assistant == nil { + continue + } + if contentIndex >= 0 && contentIndex < len(item.Assistant.Content) { + item.Assistant.Content[contentIndex].Text = "" + item.Assistant.Content[contentIndex].Transcript = "" + } + return true + } + return false +} + +// compactionCut returns the index splitting items into overflow (items[:cut], +// to be summarized+evicted) and the kept live tail (items[cut:]), keeping the +// last `keep` items. It mirrors trimRealtimeItems' pair-safety: the cut is +// pulled left so a function_call and its function_call_output are never split +// across the boundary (the whole pair lands in the kept tail). Returns 0 when +// there is nothing to cut. +func compactionCut(items []*types.MessageItemUnion, keep int) int { + // keep <= 0 means no live-window cap (the "unlimited history" sentinel, as + // in trimRealtimeItems): there is nothing to evict, so cut nothing. This + // also avoids indexing items[len(items)] in the pair-safety loop below. + if keep <= 0 { + return 0 + } + cut := len(items) - keep + if cut <= 0 { + return 0 + } + for cut > 0 && items[cut] != nil && items[cut].FunctionCallOutput != nil { + cut-- + } + return cut +} + +// resolveCompaction reads the pipeline.compaction block, applying defaults and +// the trigger>max_history invariant. maxHistory is the already-resolved live +// window size. Returns enabled=false (and zero values) when compaction is off. +func resolveCompaction(cfg *config.ModelConfig, maxHistory int) (enabled bool, trigger, maxSummaryTokens int, summaryModel string) { + if cfg == nil || cfg.Pipeline.Compaction == nil || !cfg.Pipeline.Compaction.Enabled { + return false, 0, 0, "" + } + c := cfg.Pipeline.Compaction + trigger = c.TriggerItems + if trigger <= 0 { + trigger = maxHistory * 2 + } + if trigger <= maxHistory { + trigger = maxHistory + 1 + } + maxSummaryTokens = c.MaxSummaryTokens + if maxSummaryTokens <= 0 { + maxSummaryTokens = defaultMaxSummaryTokens + } + return true, trigger, maxSummaryTokens, c.SummaryModel +} + +// prefixMatches reports whether items begins with the same ids, in order, as +// snapshot — i.e. the overflow we summarized is still at the head (no concurrent +// client delete reshuffled it). +func prefixMatches(items, snapshot []*types.MessageItemUnion) bool { + if len(items) < len(snapshot) { + return false + } + for i := range snapshot { + if itemID(items[i]) != itemID(snapshot[i]) { + return false + } + } + return true +} + +// compact folds overflow items into conv.Memory and evicts them. It never holds +// conv.Lock across the summarizer call: snapshot under lock, summarize unlocked, +// commit under lock (re-validating the head is unchanged). On any error it +// leaves the conversation untouched — items are never dropped without a summary. +func (s *Session) compact(conv *Conversation, model Model) { + if model == nil { + return + } + // Snapshot. + conv.Lock.Lock() + if len(conv.Items) <= s.CompactionTrigger { + conv.Lock.Unlock() + return + } + cut := compactionCut(conv.Items, s.MaxHistoryItems) + if cut <= 0 { + conv.Lock.Unlock() + return + } + overflow := append([]*types.MessageItemUnion(nil), conv.Items[:cut]...) + prior := conv.Memory + conv.Lock.Unlock() + + // Summarize (unlocked). + msgs := buildSummaryMessages(prior, renderItemsTranscript(overflow), s.MaxSummaryTokens) + ctx, cancel := context.WithTimeout(context.Background(), compactionTimeout) + defer cancel() + predFunc, err := model.Predict(ctx, msgs, nil, nil, nil, nil, nil, nil, nil, nil, nil) + if err != nil { + xlog.Warn("realtime compaction: summarizer predict failed", "error", err) + return + } + pred, err := predFunc() + if err != nil { + xlog.Warn("realtime compaction: summarizer inference failed", "error", err) + return + } + // Strip any leaked reasoning/thinking spans using the same extractor the + // rest of the realtime path uses, rather than a bespoke regex. + rcfg := reasoning.Config{} + if mc := model.PredictConfig(); mc != nil { + rcfg = spokenReasoningConfig(mc.ReasoningConfig) + } + _, summary := reasoning.ExtractReasoningComplete(pred.Response, "", rcfg) + summary = strings.TrimSpace(summary) + if summary == "" { + xlog.Warn("realtime compaction: empty summary, skipping eviction") + return + } + + // Commit. + conv.Lock.Lock() + defer conv.Lock.Unlock() + if !prefixMatches(conv.Items, overflow) { + xlog.Debug("realtime compaction: head changed during summary, skipping") + return + } + conv.Memory = summary + conv.Items = conv.Items[len(overflow):] + xlog.Debug("realtime compaction: evicted items into memory", "evicted", len(overflow), "remaining", len(conv.Items)) +} + +// summarizerModel resolves the model used to produce compaction summaries. +// Without a configured summary_model (or factory) it reuses the pipeline LLM. +func (s *Session) summarizerModel() Model { + if s.SummaryModel == "" || s.summarizerFactory == nil { + return s.ModelInterface + } + s.summarizerOnce.Do(func() { + m, err := s.summarizerFactory() + if err != nil { + xlog.Warn("realtime compaction: summary_model load failed, falling back to pipeline LLM", "model", s.SummaryModel, "error", err) + m = s.ModelInterface + } + s.summarizerCached = m + }) + return s.summarizerCached +} + +// maybeCompact schedules a background compaction when the live buffer has grown +// past the trigger and none is already running. Returns immediately. +func (s *Session) maybeCompact(conv *Conversation) { + if !s.CompactionEnabled { + return + } + conv.Lock.Lock() + over := len(conv.Items) > s.CompactionTrigger + conv.Lock.Unlock() + if !over { + return + } + if !conv.compacting.CompareAndSwap(false, true) { + return + } + go func() { + defer conv.compacting.Store(false) + // Resolve (and, for a configured summary_model, lazily load) the + // summarizer only when a compaction actually runs, off the response + // path — so the model load never blocks a user turn. + model := s.summarizerModel() + if model == nil { + return + } + s.compact(conv, model) + }() +} diff --git a/core/http/endpoints/openai/realtime_compaction_test.go b/core/http/endpoints/openai/realtime_compaction_test.go new file mode 100644 index 000000000000..5b19a8259ecc --- /dev/null +++ b/core/http/endpoints/openai/realtime_compaction_test.go @@ -0,0 +1,308 @@ +package openai + +import ( + "errors" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/backend" + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + "github.com/mudler/LocalAI/core/schema" +) + +var _ = Describe("resolveCompaction", func() { + It("disables when the block is absent", func() { + enabled, _, _, _ := resolveCompaction(&config.ModelConfig{}, 6) + Expect(enabled).To(BeFalse()) + }) + + It("defaults trigger to 2x max history and tokens to 512", func() { + cfg := &config.ModelConfig{Pipeline: config.Pipeline{Compaction: &config.PipelineCompaction{Enabled: true}}} + enabled, trigger, maxTok, _ := resolveCompaction(cfg, 6) + Expect(enabled).To(BeTrue()) + Expect(trigger).To(Equal(12)) + Expect(maxTok).To(Equal(512)) + }) + + It("clamps trigger to max history + 1 when misconfigured", func() { + cfg := &config.ModelConfig{Pipeline: config.Pipeline{Compaction: &config.PipelineCompaction{Enabled: true, TriggerItems: 4}}} + _, trigger, _, _ := resolveCompaction(cfg, 6) + Expect(trigger).To(Equal(7)) + }) + + It("honors explicit values", func() { + cfg := &config.ModelConfig{Pipeline: config.Pipeline{Compaction: &config.PipelineCompaction{ + Enabled: true, TriggerItems: 20, MaxSummaryTokens: 256, SummaryModel: "tiny"}}} + enabled, trigger, maxTok, model := resolveCompaction(cfg, 6) + Expect(enabled).To(BeTrue()) + Expect(trigger).To(Equal(20)) + Expect(maxTok).To(Equal(256)) + Expect(model).To(Equal("tiny")) + }) +}) + +var _ = Describe("deleteItem", func() { + mk := func(ids ...string) []*types.MessageItemUnion { + out := make([]*types.MessageItemUnion, len(ids)) + for i, id := range ids { + out[i] = &types.MessageItemUnion{User: &types.MessageItemUser{ID: id}} + } + return out + } + + It("removes the item with the given id", func() { + items, ok := deleteItem(mk("a", "b", "c"), "b") + Expect(ok).To(BeTrue()) + Expect(len(items)).To(Equal(2)) + Expect(itemID(items[0])).To(Equal("a")) + Expect(itemID(items[1])).To(Equal("c")) + }) + + It("reports not found for an unknown id", func() { + _, ok := deleteItem(mk("a"), "zzz") + Expect(ok).To(BeFalse()) + }) +}) + +var _ = Describe("clearInputAudio", func() { + It("resets the pending PCM and buffered Opus frames", func() { + s := &Session{InputAudioBuffer: []byte{1, 2, 3}, OpusFrames: [][]byte{{9}}} + clearInputAudio(s) + Expect(s.InputAudioBuffer).To(BeNil()) + Expect(s.OpusFrames).To(BeNil()) + }) +}) + +var _ = Describe("truncateAssistantText", func() { + It("clears the text of the assistant content part at the index", func() { + items := []*types.MessageItemUnion{{Assistant: &types.MessageItemAssistant{ + ID: "a1", + Content: []types.MessageContentOutput{{Type: types.MessageContentTypeText, Text: "hello world"}}, + }}} + ok := truncateAssistantText(items, "a1", 0) + Expect(ok).To(BeTrue()) + Expect(items[0].Assistant.Content[0].Text).To(Equal("")) + }) + + // Realtime assistant *audio* turns store the spoken words in .Transcript, not + // .Text, so a barge-in truncate must clear .Transcript too or it would no-op. + It("clears the transcript of an assistant audio content part", func() { + items := []*types.MessageItemUnion{{Assistant: &types.MessageItemAssistant{ + ID: "a1", + Content: []types.MessageContentOutput{{Type: types.MessageContentTypeAudio, Transcript: "hello world"}}, + }}} + ok := truncateAssistantText(items, "a1", 0) + Expect(ok).To(BeTrue()) + Expect(items[0].Assistant.Content[0].Transcript).To(Equal("")) + }) + + It("returns false for an unknown id", func() { + Expect(truncateAssistantText(nil, "nope", 0)).To(BeFalse()) + }) +}) + +var _ = Describe("compactionCut", func() { + user := func(id string) *types.MessageItemUnion { + return &types.MessageItemUnion{User: &types.MessageItemUser{ID: id}} + } + call := func(id string) *types.MessageItemUnion { + return &types.MessageItemUnion{FunctionCall: &types.MessageItemFunctionCall{ID: id}} + } + out := func(id string) *types.MessageItemUnion { + return &types.MessageItemUnion{FunctionCallOutput: &types.MessageItemFunctionCallOutput{ID: id}} + } + + It("cuts exactly len-keep when no pairs straddle the boundary", func() { + items := []*types.MessageItemUnion{user("1"), user("2"), user("3"), user("4")} + Expect(compactionCut(items, 2)).To(Equal(2)) + }) + + It("returns 0 when nothing to cut", func() { + Expect(compactionCut([]*types.MessageItemUnion{user("1")}, 2)).To(Equal(0)) + }) + + It("returns 0 (cuts nothing) when keep is 0 — the unlimited-window sentinel", func() { + items := []*types.MessageItemUnion{user("1"), user("2"), user("3")} + Expect(compactionCut(items, 0)).To(Equal(0)) + }) + + It("moves the boundary so a call/output pair is not split", func() { + // keep=2 -> naive cut=2, but items[2] is the output of items[1]'s call; + // pull the cut right so the whole pair stays in the kept tail. + items := []*types.MessageItemUnion{user("1"), call("c"), out("c"), user("4")} + Expect(compactionCut(items, 2)).To(Equal(1)) + }) +}) + +var _ = Describe("withMemory", func() { + It("inserts a memory system message when memory is non-empty", func() { + base := schema.Messages{{Role: "system", StringContent: "instructions"}} + out := withMemory(base, "user is Bob; wants pizza") + Expect(len(out)).To(Equal(2)) + Expect(out[1].Role).To(Equal("system")) + Expect(out[1].StringContent).To(ContainSubstring("user is Bob")) + Expect(out[1].StringContent).To(ContainSubstring("Summary of earlier conversation")) + }) + + It("is a no-op when memory is empty", func() { + base := schema.Messages{{Role: "system", StringContent: "instructions"}} + Expect(withMemory(base, "")).To(HaveLen(1)) + }) +}) + +var _ = Describe("renderItemsTranscript", func() { + It("renders user and assistant text turns", func() { + items := []*types.MessageItemUnion{ + {User: &types.MessageItemUser{Content: []types.MessageContentInput{{Type: types.MessageContentTypeInputText, Text: "hi"}}}}, + {Assistant: &types.MessageItemAssistant{Content: []types.MessageContentOutput{{Type: types.MessageContentTypeText, Text: "hello"}}}}, + } + out := renderItemsTranscript(items) + Expect(out).To(ContainSubstring("user: hi")) + Expect(out).To(ContainSubstring("assistant: hello")) + }) + + // Realtime assistant *audio* turns store the spoken words in .Transcript, not + // .Text, so the transcript builder must emit .Transcript too or spoken turns + // would be dropped from the summary. + It("renders an assistant audio turn from its transcript", func() { + items := []*types.MessageItemUnion{ + {Assistant: &types.MessageItemAssistant{Content: []types.MessageContentOutput{{Type: types.MessageContentTypeAudio, Transcript: "spoken words"}}}}, + } + Expect(renderItemsTranscript(items)).To(ContainSubstring("assistant: spoken words")) + }) +}) + +var _ = Describe("buildSummaryMessages", func() { + It("includes prior memory and the new transcript", func() { + msgs := buildSummaryMessages("prior facts", "user: hi", 512) + Expect(len(msgs)).To(Equal(2)) + Expect(msgs[0].Role).To(Equal("system")) + Expect(msgs[1].StringContent).To(ContainSubstring("prior facts")) + Expect(msgs[1].StringContent).To(ContainSubstring("user: hi")) + }) +}) + +var _ = Describe("compact", func() { + user := func(id, text string) *types.MessageItemUnion { + return &types.MessageItemUnion{User: &types.MessageItemUser{ID: id, + Content: []types.MessageContentInput{{Type: types.MessageContentTypeInputText, Text: text}}}} + } + + It("summarizes overflow into Memory and evicts it, keeping the live tail", func() { + conv := &Conversation{Items: []*types.MessageItemUnion{ + user("1", "a"), user("2", "b"), user("3", "c"), user("4", "d"), + user("5", "e"), user("6", "f"), user("7", "g"), user("8", "h"), + }} + s := &Session{CompactionEnabled: true, CompactionTrigger: 7, MaxHistoryItems: 4, MaxSummaryTokens: 512} + m := &fakeModel{predictResp: backend.LLMResponse{Response: "ROLLED UP"}} + + s.compact(conv, m) + + Expect(conv.Memory).To(Equal("ROLLED UP")) + Expect(len(conv.Items)).To(Equal(4)) + Expect(itemID(conv.Items[0])).To(Equal("5")) + // The summarizer saw the evicted turns. + Expect(m.lastMessages[1].StringContent).To(ContainSubstring("a")) + }) + + It("leaves Items and Memory untouched when the summarizer errors", func() { + items := []*types.MessageItemUnion{user("1", "a"), user("2", "b"), user("3", "c")} + conv := &Conversation{Items: items} + s := &Session{CompactionEnabled: true, CompactionTrigger: 2, MaxHistoryItems: 1, MaxSummaryTokens: 512} + m := &fakeModel{predictErr: errors.New("boom")} + + s.compact(conv, m) + + Expect(conv.Memory).To(Equal("")) + Expect(len(conv.Items)).To(Equal(3)) + }) + + It("strips leaked reasoning tags from the summary via the shared extractor", func() { + conv := &Conversation{Items: []*types.MessageItemUnion{ + user("1", "a"), user("2", "b"), user("3", "c"), user("4", "d"), + user("5", "e"), user("6", "f"), user("7", "g"), user("8", "h"), + }} + s := &Session{CompactionEnabled: true, CompactionTrigger: 7, MaxHistoryItems: 4, MaxSummaryTokens: 512} + m := &fakeModel{predictResp: backend.LLMResponse{Response: "planning the summaryCLEAN SUMMARY"}} + + s.compact(conv, m) + + Expect(conv.Memory).To(Equal("CLEAN SUMMARY")) + Expect(conv.Memory).ToNot(ContainSubstring("planning")) + }) + + It("does nothing when items are at or below the trigger", func() { + conv := &Conversation{Items: []*types.MessageItemUnion{user("1", "a")}} + s := &Session{CompactionEnabled: true, CompactionTrigger: 7, MaxHistoryItems: 4} + s.compact(conv, &fakeModel{predictResp: backend.LLMResponse{Response: "x"}}) + Expect(conv.Memory).To(Equal("")) + Expect(len(conv.Items)).To(Equal(1)) + }) +}) + +var _ = Describe("prefixMatches", func() { + user := func(id string) *types.MessageItemUnion { + return &types.MessageItemUnion{User: &types.MessageItemUser{ID: id}} + } + + It("matches when items begins with the snapshot ids in order", func() { + items := []*types.MessageItemUnion{user("1"), user("2"), user("3")} + snap := []*types.MessageItemUnion{user("1"), user("2")} + Expect(prefixMatches(items, snap)).To(BeTrue()) + }) + + It("matches an empty snapshot", func() { + Expect(prefixMatches([]*types.MessageItemUnion{user("1")}, nil)).To(BeTrue()) + }) + + It("fails when items is shorter than the snapshot (a concurrent delete shrank the head)", func() { + items := []*types.MessageItemUnion{user("1")} + snap := []*types.MessageItemUnion{user("1"), user("2")} + Expect(prefixMatches(items, snap)).To(BeFalse()) + }) + + It("fails when the head ids differ (a concurrent delete reordered the head)", func() { + items := []*types.MessageItemUnion{user("2"), user("3")} + snap := []*types.MessageItemUnion{user("1"), user("2")} + Expect(prefixMatches(items, snap)).To(BeFalse()) + }) +}) + +var _ = Describe("summarizerModel", func() { + It("returns the pipeline model when no summary_model is set", func() { + m := &fakeModel{} + s := &Session{ModelInterface: m} + Expect(s.summarizerModel()).To(Equal(m)) + }) + + It("uses the factory (once) when summary_model is set", func() { + pipeline := &fakeModel{} + small := &fakeModel{} + calls := 0 + s := &Session{ModelInterface: pipeline, SummaryModel: "tiny", + summarizerFactory: func() (Model, error) { calls++; return small, nil }} + Expect(s.summarizerModel()).To(Equal(small)) + Expect(s.summarizerModel()).To(Equal(small)) + Expect(calls).To(Equal(1)) + }) + + It("falls back to the pipeline model when the factory errors", func() { + pipeline := &fakeModel{} + s := &Session{ModelInterface: pipeline, SummaryModel: "tiny", + summarizerFactory: func() (Model, error) { return nil, errors.New("nope") }} + Expect(s.summarizerModel()).To(Equal(pipeline)) + }) +}) + +var _ = Describe("itemID", func() { + It("returns the id for each variant and empty for nil", func() { + Expect(itemID(nil)).To(Equal("")) + Expect(itemID(&types.MessageItemUnion{User: &types.MessageItemUser{ID: "u1"}})).To(Equal("u1")) + Expect(itemID(&types.MessageItemUnion{Assistant: &types.MessageItemAssistant{ID: "a1"}})).To(Equal("a1")) + Expect(itemID(&types.MessageItemUnion{System: &types.MessageItemSystem{ID: "s1"}})).To(Equal("s1")) + Expect(itemID(&types.MessageItemUnion{FunctionCall: &types.MessageItemFunctionCall{ID: "f1"}})).To(Equal("f1")) + Expect(itemID(&types.MessageItemUnion{FunctionCallOutput: &types.MessageItemFunctionCallOutput{ID: "o1"}})).To(Equal("o1")) + }) +}) diff --git a/docs/content/features/openai-realtime.md b/docs/content/features/openai-realtime.md index 48cfc93325f6..a6e99267efcb 100644 --- a/docs/content/features/openai-realtime.md +++ b/docs/content/features/openai-realtime.md @@ -68,6 +68,33 @@ pipeline: This is applied only to the realtime session's copy of the LLM config, so it does not affect other users of the same model. Leave it unset to use the LLM model config's own reasoning settings. +### Conversation compaction (long sessions on CPU) + +By default a realtime session feeds only the last `max_history_items` turns to the LLM; older turns are dropped and forgotten. On CPU, long calls also grow expensive as the prompt fills with verbatim history. Enable `compaction` to instead fold older turns into a rolling summary, so long calls stay cheap without losing earlier context. + +Compaction works with two numbers: + +- **`max_history_items`** is the *live window* — the recent turns kept verbatim in the prompt. +- **`compaction.trigger_items`** is the *high-water mark* — let the buffer grow to here, then summarize the overflow (everything above `max_history_items`) into a rolling memory and evict it. It must be greater than `max_history_items`; if it is not, it is clamped up. + +The gap between the two controls how often summarization runs: a summary call fires roughly every `(trigger_items - max_history_items)` turns (here, about every 6 turns). + +```yaml +pipeline: + max_history_items: 6 # live window — recent turns kept verbatim + compaction: + enabled: true + trigger_items: 12 # summarize overflow back down to max_history_items + summary_model: "" # optional: a small model for the summary (CPU); default = pipeline LLM + max_summary_tokens: 512 +``` + +{{% notice tip %}} +On CPU, set `summary_model` to a small, fast model so compaction never competes with the conversation LLM for compute. Left empty, the pipeline's own LLM produces the summary. +{{% /notice %}} + +Clients can also manage history directly via the now-supported `conversation.item.delete`, `conversation.item.truncate`, and `input_audio_buffer.clear` realtime events. + ## Transports The Realtime API supports two transports: **WebSocket** and **WebRTC**.