Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions src/agent/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,9 @@ export class ModelClient {

const isAnthropic = request.model.startsWith('anthropic/');
const isGLM = request.model.startsWith('zai/') || request.model.includes('glm');
const isGeminiThinkingRequired =
request.model.startsWith('google/gemini-3.1') ||
request.model.startsWith('google/gemini-2.5-pro');

// Build the request payload, injecting model-specific optimizations
let requestPayload: Record<string, unknown> = { ...request, stream: true };
Expand Down Expand Up @@ -482,6 +485,30 @@ export class ModelClient {
}
}

// Gemini Pro reasoning models reject a missing/zero thinking budget. Normalize
// the gateway default so fallback routing doesn't fail with "Budget 0 is invalid."
if (isGeminiThinkingRequired) {
// The gateway's streaming path currently drops Gemini's thinking budget;
// non-streaming preserves it. We convert the JSON response back into the
// same internal chunks below so callers keep one code path.
requestPayload['stream'] = false;
const maxOut = request.max_tokens ?? 16_384;
const budgetTokens = Math.min(maxOut, 8_192);
const thinking = requestPayload['thinking'];
if (thinking && typeof thinking === 'object' && !Array.isArray(thinking)) {
requestPayload['thinking'] = {
...thinking,
type: 'enabled',
budget_tokens: budgetTokens,
};
} else {
requestPayload['thinking'] = {
type: 'enabled',
budget_tokens: budgetTokens,
};
}
}

if (isAnthropic) {
// ─ Anthropic extended thinking ──────────────────────────────────────
// Enable the `thinking` API block only for models that accept it.
Expand Down Expand Up @@ -677,13 +704,70 @@ export class ModelClient {
}
}

if (requestPayload['stream'] === false) {
yield* this.parseNonStreamingMessage(response, request.model);
return;
}

// Parse SSE stream
yield* this.parseSSEStream(response, requestController, streamTimeoutMs, request.model);
} finally {
unlinkAbort();
}
}

private async *parseNonStreamingMessage(
response: Response,
model: string,
): AsyncGenerator<StreamChunk> {
const parsed = await response.json() as Record<string, unknown>;
yield { kind: 'message_start', payload: { message: parsed } };

const content = Array.isArray(parsed['content']) ? parsed['content'] as Record<string, unknown>[] : [];
for (let index = 0; index < content.length; index++) {
const block = content[index];
yield { kind: 'content_block_start', payload: { index, content_block: block } };

if (block.type === 'text' && typeof block.text === 'string') {
yield {
kind: 'content_block_delta',
payload: { index, delta: { type: 'text_delta', text: block.text } },
};
} else if (block.type === 'thinking' && typeof block.thinking === 'string') {
yield {
kind: 'content_block_delta',
payload: { index, delta: { type: 'thinking_delta', thinking: block.thinking } },
};
if (typeof block.signature === 'string') {
yield {
kind: 'content_block_delta',
payload: { index, delta: { type: 'signature_delta', signature: block.signature } },
};
}
} else if (block.type === 'tool_use') {
yield {
kind: 'content_block_delta',
payload: { index, delta: { type: 'input_json_delta', partial_json: JSON.stringify(block.input ?? {}) } },
};
}

yield { kind: 'content_block_stop', payload: { index } };
}

yield {
kind: 'message_delta',
payload: {
delta: { stop_reason: parsed['stop_reason'] ?? 'end_turn' },
usage: parsed['usage'] ?? {},
},
};
yield { kind: 'message_stop', payload: {} };

if (this.debug) {
console.error(`[franklin] Parsed non-streaming response for ${model}`);
}
}

/**
* Non-streaming completion for simple requests.
*/
Expand Down
Loading