From 40e3d145bcd5ebe0471a7a640ff0ac21a23d3f8e Mon Sep 17 00:00:00 2001 From: Alex Kuznicki Date: Wed, 24 Jun 2026 15:04:40 -0500 Subject: [PATCH] Improve HTTP transport connection pooling for pipeline adapters Add configurable TransportConfig with higher default idle connection pool limits (100 per host vs Go's 2) to reduce connection churn for high-throughput bridge and http pipeline tasks. --- pkg/http/http.go | 23 ++++++++-------- pkg/http/transport.go | 56 ++++++++++++++++++++++++++++++++++++++ pkg/http/transport_test.go | 43 +++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 11 deletions(-) create mode 100644 pkg/http/transport.go create mode 100644 pkg/http/transport_test.go diff --git a/pkg/http/http.go b/pkg/http/http.go index 163446dc83..2b53d036ec 100644 --- a/pkg/http/http.go +++ b/pkg/http/http.go @@ -29,25 +29,26 @@ type clientConfig interface { // NewRestrictedClient returns a secure HTTP Client (queries to certain // local addresses are blocked) func NewRestrictedClient(cfg clientConfig, lggr logger.Logger) *http.Client { - tr := newDefaultTransport() + return NewRestrictedClientWithTransportConfig(cfg, lggr, TransportConfig{}) +} + +// NewRestrictedClientWithTransportConfig returns a secure HTTP Client with custom +// connection pool settings. +func NewRestrictedClientWithTransportConfig(cfg clientConfig, lggr logger.Logger, transportCfg TransportConfig) *http.Client { + tr := newDefaultTransport(transportCfg) tr.DialContext = makeRestrictedDialContext(cfg, lggr) return &http.Client{Transport: tr} } // NewUnrestrictedClient returns a HTTP Client with no Transport restrictions func NewUnrestrictedClient() *http.Client { - unrestrictedTr := newDefaultTransport() - return &http.Client{Transport: unrestrictedTr} + return NewUnrestrictedClientWithTransportConfig(TransportConfig{}) } -func newDefaultTransport() *http.Transport { - t := http.DefaultTransport.(*http.Transport).Clone() - // There are certain classes of vulnerabilities that open up when - // compression is enabled. For simplicity, we disable compression - // to cut off this class of attacks. - // https://www.cyberis.co.uk/2013/08/vulnerabilities-that-just-wont-die.html - t.DisableCompression = true - return t +// NewUnrestrictedClientWithTransportConfig returns an HTTP Client with custom +// connection pool settings and no transport restrictions. +func NewUnrestrictedClientWithTransportConfig(transportCfg TransportConfig) *http.Client { + return &http.Client{Transport: newDefaultTransport(transportCfg)} } type Client interface { diff --git a/pkg/http/transport.go b/pkg/http/transport.go new file mode 100644 index 0000000000..3254f4c1db --- /dev/null +++ b/pkg/http/transport.go @@ -0,0 +1,56 @@ +package http + +import ( + "net/http" + "time" +) + +// TransportConfig configures HTTP connection pooling for pipeline adapters. +// Zero values are replaced with DefaultTransportConfig before being applied. +type TransportConfig struct { + MaxIdleConns int + MaxIdleConnsPerHost int + IdleConnTimeout time.Duration +} + +// DefaultTransportConfig returns transport pool settings tuned for high-throughput +// bridge and pipeline HTTP adapters. +func DefaultTransportConfig() TransportConfig { + return TransportConfig{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + } +} + +func (c TransportConfig) withDefaults() TransportConfig { + defaults := DefaultTransportConfig() + if c.MaxIdleConns == 0 { + c.MaxIdleConns = defaults.MaxIdleConns + } + if c.MaxIdleConnsPerHost == 0 { + c.MaxIdleConnsPerHost = defaults.MaxIdleConnsPerHost + } + if c.IdleConnTimeout == 0 { + c.IdleConnTimeout = defaults.IdleConnTimeout + } + return c +} + +func (c TransportConfig) apply(t *http.Transport) { + cfg := c.withDefaults() + t.MaxIdleConns = cfg.MaxIdleConns + t.MaxIdleConnsPerHost = cfg.MaxIdleConnsPerHost + t.IdleConnTimeout = cfg.IdleConnTimeout +} + +func newDefaultTransport(cfg TransportConfig) *http.Transport { + t := http.DefaultTransport.(*http.Transport).Clone() + // There are certain classes of vulnerabilities that open up when + // compression is enabled. For simplicity, we disable compression + // to cut off this class of attacks. + // https://www.cyberis.co.uk/2013/08/vulnerabilities-that-just-wont-die.html + t.DisableCompression = true + cfg.apply(t) + return t +} diff --git a/pkg/http/transport_test.go b/pkg/http/transport_test.go new file mode 100644 index 0000000000..2100a163b8 --- /dev/null +++ b/pkg/http/transport_test.go @@ -0,0 +1,43 @@ +package http + +import ( + netHttp "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestDefaultTransportConfig(t *testing.T) { + t.Parallel() + + cfg := DefaultTransportConfig() + assert.Equal(t, 100, cfg.MaxIdleConns) + assert.Equal(t, 100, cfg.MaxIdleConnsPerHost) + assert.Equal(t, 90*time.Second, cfg.IdleConnTimeout) +} + +func TestUnrestrictedClientTransportPooling(t *testing.T) { + t.Parallel() + + client := NewUnrestrictedClient() + tr := client.Transport.(*netHttp.Transport) + assert.True(t, tr.DisableCompression) + assert.Equal(t, 100, tr.MaxIdleConns) + assert.Equal(t, 100, tr.MaxIdleConnsPerHost) + assert.Equal(t, 90*time.Second, tr.IdleConnTimeout) +} + +func TestUnrestrictedClientWithTransportConfig(t *testing.T) { + t.Parallel() + + client := NewUnrestrictedClientWithTransportConfig(TransportConfig{ + MaxIdleConns: 50, + MaxIdleConnsPerHost: 25, + IdleConnTimeout: 30 * time.Second, + }) + tr := client.Transport.(*netHttp.Transport) + assert.Equal(t, 50, tr.MaxIdleConns) + assert.Equal(t, 25, tr.MaxIdleConnsPerHost) + assert.Equal(t, 30*time.Second, tr.IdleConnTimeout) +}