Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 123 additions & 2 deletions src/ngx_http_lua_socket_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ ngx_http_lua_socket_tcp_create_socket_pool(lua_State *L, ngx_http_request_t *r,
for (i = 0; i < pool_size; i++) {
ngx_queue_insert_head(&sp->free, &items[i].queue);
items[i].socket_pool = sp;
items[i].on_push_cb_ref = LUA_NOREF;
}

*spool = sp;
Expand Down Expand Up @@ -1003,6 +1004,7 @@ ngx_http_lua_socket_tcp_connect(lua_State *L)
int key_index;
ngx_int_t backlog;
ngx_int_t pool_size;
int on_push_cb_ref;
ngx_str_t key;
const char *msg;

Expand Down Expand Up @@ -1036,6 +1038,7 @@ ngx_http_lua_socket_tcp_connect(lua_State *L)
key_index = 2;
pool_size = 0;
custom_pool = 0;
on_push_cb_ref = LUA_NOREF;
llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module);

if (lua_type(L, n) == LUA_TTABLE) {
Expand Down Expand Up @@ -1080,6 +1083,20 @@ ngx_http_lua_socket_tcp_connect(lua_State *L)

lua_pop(L, 1);

lua_getfield(L, n, "on_push");

if (lua_isfunction(L, -1)) {
on_push_cb_ref = luaL_ref(L, LUA_REGISTRYINDEX);
} else {
if (!lua_isnil(L, -1)) {
msg = lua_pushfstring(L, "bad \"on_push\" option type: %s",
lua_typename(L, lua_type(L, -1)));
return luaL_argerror(L, n, msg);
}

lua_pop(L, 1);
}

lua_getfield(L, n, "pool");

switch (lua_type(L, -1)) {
Expand Down Expand Up @@ -1217,6 +1234,8 @@ ngx_http_lua_socket_tcp_connect(lua_State *L)

ngx_memzero(u, sizeof(ngx_http_lua_socket_tcp_upstream_t));

u->on_push_cb_ref = on_push_cb_ref;

u->request = r; /* set the controlling request */

u->conf = llcf;
Expand Down Expand Up @@ -4524,12 +4543,22 @@ ngx_http_lua_socket_tcp_finalize(ngx_http_request_t *r,
{
ngx_connection_t *c;
ngx_http_lua_socket_pool_t *spool;
lua_State *L;

dd("request: %p, u: %p, u->cleanup: %p", r, u, u->cleanup);

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua finalize socket");

if (u->on_push_cb_ref != LUA_NOREF) {
L = ngx_http_lua_get_lua_vm(r, NULL);
if (L != NULL) {
luaL_unref(L, LUA_REGISTRYINDEX, u->on_push_cb_ref);
}
u->on_push_cb_ref = LUA_NOREF;
}


if (u->cleanup) {
*u->cleanup = NULL;
ngx_http_lua_cleanup_free(r, u->cleanup);
Expand Down Expand Up @@ -5643,8 +5672,7 @@ ngx_http_lua_socket_tcp_setkeepalive(lua_State *L)
/* When the server closes the connection,
* epoll will return EPOLLRDHUP event and nginx will set pending_eof.
*/
if (c == NULL || u->read_closed || u->write_closed
|| c->read->eof || c->read->pending_eof)
if (c == NULL || u->read_closed || u->write_closed)
{
lua_pushnil(L);
lua_pushliteral(L, "closed");
Expand Down Expand Up @@ -5740,6 +5768,14 @@ ngx_http_lua_socket_tcp_setkeepalive(lua_State *L)

item = ngx_queue_data(q, ngx_http_lua_socket_pool_item_t, queue);

if (item->on_push_cb_ref != LUA_NOREF) {
lua_State *evict_L = spool->lua_vm;
if (evict_L != NULL) {
luaL_unref(evict_L, LUA_REGISTRYINDEX, item->on_push_cb_ref);
}
item->on_push_cb_ref = LUA_NOREF;
}

ngx_http_lua_socket_tcp_close_connection(item->connection);

/* only decrease the counter for connections which were counted */
Expand All @@ -5766,6 +5802,9 @@ ngx_http_lua_socket_tcp_setkeepalive(lua_State *L)
item->connection = c;
ngx_queue_insert_head(&spool->cache, q);

item->on_push_cb_ref = u->on_push_cb_ref;
u->on_push_cb_ref = LUA_NOREF;

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"lua tcp socket clear current socket connection");

Expand Down Expand Up @@ -5983,6 +6022,71 @@ ngx_http_lua_socket_keepalive_close_handler(ngx_event_t *ev)
return NGX_OK;
}

item = c->data;

if (n > 0 && item->on_push_cb_ref != LUA_NOREF) {
char rbuf[4096];
ssize_t nread;
lua_State *L;
int close_conn;
int nret;

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0,
"lua tcp socket keepalive: data received, calling cb");

spool = item->socket_pool;
L = spool->lua_vm;
if (L == NULL) {
goto close;
}

rbuf[0] = buf[0];

/* read the available data into a stack buffer; no request pool needed */
nread = c->recv(c, rbuf + 1, sizeof(rbuf) - 1);
if (nread <= 0) {
goto close;
}

lua_rawgeti(L, LUA_REGISTRYINDEX, item->on_push_cb_ref);
lua_pushlstring(L, rbuf, (size_t) nread);

/* callback(data) -> reply:string|nil, close:bool */
if (lua_pcall(L, 1, 2, 0) != LUA_OK) {
ngx_log_error(NGX_LOG_ERR, ev->log, 0,
"lua tcp socket keepalive callback error: %s",
lua_tostring(L, -1));
lua_pop(L, 1);
goto close;
}

/* stack: reply(-2), close(-1) */
if (lua_type(L, -2) == LUA_TSTRING) {
size_t slen;
const char *sdata = lua_tolstring(L, -2, &slen);
ssize_t nsent = 0;

while ((size_t) nsent < slen) {
ssize_t w = c->send(c, sdata + nsent, slen - nsent, 0);
if (w <= 0) {
lua_pop(L, 2);
goto close;
}
nsent += w;
}
}

close_conn = !lua_toboolean(L, -1);
lua_pop(L, 2);

if (!close_conn) {
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
goto close;
}
return NGX_OK;
}
}

close:

ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, 0,
Expand All @@ -5991,6 +6095,14 @@ ngx_http_lua_socket_keepalive_close_handler(ngx_event_t *ev)
item = c->data;
spool = item->socket_pool;

if (item->on_push_cb_ref != LUA_NOREF) {
lua_State *close_L = spool->lua_vm;
if (close_L != NULL) {
luaL_unref(close_L, LUA_REGISTRYINDEX, item->on_push_cb_ref);
}
item->on_push_cb_ref = LUA_NOREF;
}

ngx_http_lua_socket_tcp_close_connection(c);

ngx_queue_remove(&item->queue);
Expand Down Expand Up @@ -6042,6 +6154,15 @@ ngx_http_lua_socket_shutdown_pool_helper(ngx_http_lua_socket_pool_t *spool)
q = ngx_queue_head(&spool->cache);

item = ngx_queue_data(q, ngx_http_lua_socket_pool_item_t, queue);

if (item->on_push_cb_ref != LUA_NOREF) {
lua_State *close_L = spool->lua_vm;
if (close_L != NULL) {
luaL_unref(close_L, LUA_REGISTRYINDEX, item->on_push_cb_ref);
}
item->on_push_cb_ref = LUA_NOREF;
}

c = item->connection;

ngx_http_lua_socket_tcp_close_connection(c);
Expand Down
3 changes: 3 additions & 0 deletions src/ngx_http_lua_socket_tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ struct ngx_http_lua_socket_tcp_upstream_s {
ngx_http_lua_co_ctx_t *read_co_ctx;
ngx_http_lua_co_ctx_t *write_co_ctx;

int on_push_cb_ref;

ngx_uint_t reused;
struct sockaddr_storage sockaddr;
socklen_t socklen;
Expand Down Expand Up @@ -189,6 +191,7 @@ typedef struct {
char host[COSOCKET_HOST_LEN];

ngx_uint_t reused;
int on_push_cb_ref;

ngx_http_lua_socket_udata_queue_t *udata_queue;
} ngx_http_lua_socket_pool_item_t;
Expand Down
Loading