Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def self.build(options)
uri = options[:uri]
auth_secret = options[:auth_secret] || ForestAdminAgent::Facades::Container.cache(:auth_secret)
provided_introspection = options[:introspection]
provided_introspection_etag = options[:introspection_etag]

polling_interval = options[:schema_polling_interval_sec] ||
ENV['SCHEMA_POLLING_INTERVAL_SEC']&.to_i ||
Expand All @@ -28,7 +29,8 @@ def self.build(options)
uri,
auth_secret,
polling_interval: polling_interval,
introspection_schema: provided_introspection
introspection_schema: provided_introspection,
introspection_etag: provided_introspection_etag
) do
Thread.new do
logger = ForestAdminAgent::Facades::Container.logger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ class SchemaPollingClient
MAX_POLLING_INTERVAL = 3600

def initialize(uri, auth_secret, polling_interval: DEFAULT_POLLING_INTERVAL, introspection_schema: nil,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function with many parameters (count = 6): initialize [qlty:function-parameters]

&on_schema_change)
introspection_etag: nil, &on_schema_change)
@uri = uri
@auth_secret = auth_secret
@polling_interval = polling_interval
@on_schema_change = on_schema_change
@closed = false
@introspection_schema = introspection_schema
@introspection_etag = introspection_etag
@current_schema = nil
@cached_etag = nil
@connection_attempts = 0
Expand Down Expand Up @@ -77,12 +78,12 @@ def check_schema
def compute_etag(schema)
return nil if schema.nil?

Digest::SHA1.hexdigest(schema.to_json)
Digest::SHA1.hexdigest(JSON.generate(schema))
end

def fetch_initial_schema_sync
# If we have an introspection schema, send its ETag to avoid re-downloading unchanged schema
introspection_etag = compute_etag(@introspection_schema) if @introspection_schema
introspection_etag = @introspection_etag || (@introspection_schema && compute_etag(@introspection_schema))
result = @rpc_client.fetch_schema('/forest/rpc-schema', if_none_match: introspection_etag)

if result == RpcClient::NotModified
Expand Down Expand Up @@ -115,8 +116,9 @@ def handle_initial_fetch_error(error)
if @introspection_schema
# Fallback to introspection schema - don't crash
@current_schema = @introspection_schema
@cached_etag = compute_etag(@current_schema)
@cached_etag = @introspection_etag || compute_etag(@current_schema)
@introspection_schema = nil
@introspection_etag = nil
@initial_sync_completed = true
ForestAdminAgent::Facades::Container.logger&.log(
'Warn',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ class Collection < ForestAdminDatasourceToolkit::Collection
include ForestAdminDatasourceRpc::Utils
include ForestAdminDatasourceCustomizer::Decorators::Action

def initialize(datasource, name, options, schema)
def initialize(datasource, name, schema)
super(datasource, name)
@options = options
@client = RpcClient.new(@options[:uri], @options[:auth_secret] || ForestAdminAgent::Facades::Container.cache(:auth_secret))
@client = datasource.shared_rpc_client
@rpc_collection_uri = "/forest/rpc/#{name}"
@base_params = { collection_name: name }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module ForestAdminDatasourceRpc
class Datasource < ForestAdminDatasourceToolkit::Datasource
include ForestAdminDatasourceRpc::Utils

attr_reader :shared_rpc_client

def initialize(options, introspection, schema_polling_client = nil)
super()

Expand All @@ -11,11 +13,15 @@ def initialize(options, introspection, schema_polling_client = nil)
"collections and #{introspection[:charts].length} charts."
)

@shared_rpc_client = RpcClient.new(
options[:uri],
options[:auth_secret] || ForestAdminAgent::Facades::Container.cache(:auth_secret)
)

introspection[:collections].each do |schema|
add_collection(Collection.new(self, schema[:name], options, schema))
add_collection(Collection.new(self, schema[:name], schema))
end

@options = options
@charts = introspection[:charts]
@rpc_relations = introspection[:rpc_relations]
@schema_polling_client = schema_polling_client
Expand All @@ -31,28 +37,29 @@ def initialize(options, introspection, schema_polling_client = nil)
end

def render_chart(caller, name)
client = RpcClient.new(@options[:uri], @options[:auth_secret] || ForestAdminAgent::Facades::Container.cache(:auth_secret))
url = 'forest/rpc-datasource-chart'

ForestAdminAgent::Facades::Container.logger.log(
'Debug',
"Forwarding datasource chart '#{name}' call to the Rpc agent on #{url}."
)

client.call_rpc(url, caller: caller, method: :post, payload: { chart: name })
@shared_rpc_client.call_rpc(url, caller: caller, method: :post, payload: { chart: name })
end

def execute_native_query(connection_name, query, binds)
client = RpcClient.new(@options[:uri], @options[:auth_secret] || ForestAdminAgent::Facades::Container.cache(:auth_secret))
url = 'forest/rpc-native-query'

ForestAdminAgent::Facades::Container.logger.log(
'Debug',
"Forwarding native query for connection '#{connection_name}' to the Rpc agent on #{url}."
)

result = client.call_rpc(url, method: :post,
payload: { connection_name: connection_name, query: query, binds: binds })
result = @shared_rpc_client.call_rpc(
url,
method: :post,
payload: { connection_name: connection_name, query: query, binds: binds }
)
ForestAdminDatasourceToolkit::Utils::HashHelper.convert_keys(result.to_a)
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ module ForestAdminDatasourceRpc
'http://localhost',
'secret',
polling_interval: 600,
introspection_schema: nil
introspection_schema: nil,
introspection_etag: nil
)
end

Expand All @@ -121,7 +122,8 @@ module ForestAdminDatasourceRpc
'http://localhost',
'secret',
polling_interval: 120,
introspection_schema: nil
introspection_schema: nil,
introspection_etag: nil
)
end

Expand All @@ -134,7 +136,8 @@ module ForestAdminDatasourceRpc
'http://localhost',
'secret',
polling_interval: 30,
introspection_schema: nil
introspection_schema: nil,
introspection_etag: nil
)
ensure
ENV.delete('SCHEMA_POLLING_INTERVAL_SEC')
Expand All @@ -149,11 +152,43 @@ module ForestAdminDatasourceRpc
'http://localhost',
'secret',
polling_interval: 120,
introspection_schema: nil
introspection_schema: nil,
introspection_etag: nil
)
ensure
ENV.delete('SCHEMA_POLLING_INTERVAL_SEC')
end

it 'passes introspection_etag to schema polling client when provided' do
described_class.build({
uri: 'http://localhost',
introspection: introspection,
introspection_etag: 'precomputed-etag-123'
})

expect(Utils::SchemaPollingClient).to have_received(:new).with(
'http://localhost',
'secret',
polling_interval: 600,
introspection_schema: introspection,
introspection_etag: 'precomputed-etag-123'
)
end

it 'passes introspection_schema without etag when only introspection is provided' do
described_class.build({
uri: 'http://localhost',
introspection: introspection
})

expect(Utils::SchemaPollingClient).to have_received(:new).with(
'http://localhost',
'secret',
polling_interval: 600,
introspection_schema: introspection,
introspection_etag: nil
)
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ module ForestAdminDatasourceRpc

include_examples 'with introspection'

context 'when initialized' do
it 'uses the datasource shared RPC client' do
expect(collection.instance_variable_get(:@client)).to eq(datasource.shared_rpc_client)
end
end

context 'when call list' do
it 'forward the call to the server' do
collection.list(caller, Filter.new, Projection.new)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ module ForestAdminDatasourceRpc
expect(datasource.live_query_connections).to eq({})
end

it 'creates a shared RPC client' do
expect(datasource.shared_rpc_client).to eq(rpc_client)
end

it 'creates RPC client with correct uri and auth_secret' do
described_class.new({ uri: 'http://localhost', auth_secret: 'custom-secret' }, introspection)

expect(Utils::RpcClient).to have_received(:new).with('http://localhost', 'custom-secret')
end

it 'uses Container auth_secret when not provided in options' do
described_class.new({ uri: 'http://localhost' }, introspection)

expect(Utils::RpcClient).to have_received(:new).with('http://localhost', 'secret')
end

context 'with schema polling client' do
let(:schema_polling_client) { instance_double(Utils::SchemaPollingClient, stop: nil) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ module Utils
end.not_to raise_error
end
end

context 'with introspection_etag parameter' do
it 'stores introspection_etag when provided' do
client = described_class.new(uri, secret, introspection_etag: 'provided-etag-123') { |schema| callback.call(schema) }

expect(client.instance_variable_get(:@introspection_etag)).to eq('provided-etag-123')
end

it 'defaults introspection_etag to nil' do
client = described_class.new(uri, secret) { |schema| callback.call(schema) }

expect(client.instance_variable_get(:@introspection_etag)).to be_nil
end
end
end

describe '#start' do
Expand Down Expand Up @@ -314,6 +328,104 @@ module Utils
end
end

describe '#start? with introspection_etag' do
let(:rpc_client) { instance_double(RpcClient) }
let(:introspection) { { collections: [{ name: 'Products' }], charts: [] } }

before do
allow(RpcClient).to receive(:new).and_return(rpc_client)
end

it 'uses provided introspection_etag in initial fetch instead of computing it' do
client = described_class.new(
uri, secret,
introspection_schema: introspection,
introspection_etag: 'precomputed-etag-abc'
) { |s| callback.call(s) }

allow(rpc_client).to receive(:fetch_schema).and_return(RpcClient::NotModified)

client.start?
client.stop

# Should use the provided etag, not compute one from the schema
expect(rpc_client).to have_received(:fetch_schema).with(
'/forest/rpc-schema',
if_none_match: 'precomputed-etag-abc'
)
end

it 'computes etag from introspection_schema when introspection_etag is not provided' do
client = described_class.new(
uri, secret,
introspection_schema: introspection
) { |s| callback.call(s) }

allow(rpc_client).to receive(:fetch_schema).and_return(RpcClient::NotModified)

client.start?
client.stop

# Should compute etag from the schema
expected_etag = Digest::SHA1.hexdigest(JSON.generate(introspection))
expect(rpc_client).to have_received(:fetch_schema).with(
'/forest/rpc-schema',
if_none_match: expected_etag
)
end

it 'uses provided introspection_etag as cached_etag on NotModified response' do
client = described_class.new(
uri, secret,
introspection_schema: introspection,
introspection_etag: 'precomputed-etag-xyz'
) { |s| callback.call(s) }

allow(rpc_client).to receive(:fetch_schema).and_return(RpcClient::NotModified)

client.start?
client.stop

expect(client.instance_variable_get(:@cached_etag)).to eq('precomputed-etag-xyz')
end

it 'uses provided introspection_etag when falling back to introspection_schema on error' do
client = described_class.new(
uri, secret,
introspection_schema: introspection,
introspection_etag: 'fallback-etag-123'
) { |s| callback.call(s) }

allow(rpc_client).to receive(:fetch_schema).and_raise(Faraday::ConnectionFailed, 'Connection refused')

client.start?
client.stop

# Should use the introspection schema as current schema
expect(client.instance_variable_get(:@current_schema)).to eq(introspection)
# Should use the provided etag
expect(client.instance_variable_get(:@cached_etag)).to eq('fallback-etag-123')
# Should clear introspection fields
expect(client.instance_variable_get(:@introspection_schema)).to be_nil
expect(client.instance_variable_get(:@introspection_etag)).to be_nil
end

it 'computes etag when falling back without introspection_etag' do
client = described_class.new(
uri, secret,
introspection_schema: introspection
) { |s| callback.call(s) }

allow(rpc_client).to receive(:fetch_schema).and_raise(Faraday::ConnectionFailed, 'Connection refused')

client.start?
client.stop

expected_etag = Digest::SHA1.hexdigest(JSON.generate(introspection))
expect(client.instance_variable_get(:@cached_etag)).to eq(expected_etag)
end
end

describe '#trigger_schema_change_callback' do
it 'executes the callback with schema' do
client = described_class.new(uri, secret) { |schema| callback.call(schema) }
Expand Down