diff --git a/src/ngx_http_lua_socket_tcp.c b/src/ngx_http_lua_socket_tcp.c index 93b0d119e1..4f9cf9fd19 100644 --- a/src/ngx_http_lua_socket_tcp.c +++ b/src/ngx_http_lua_socket_tcp.c @@ -576,6 +576,7 @@ ngx_http_lua_socket_tcp_create_socket_pool(lua_State *L, ngx_http_request_t *r, for (i = 0; i < pool_size; i++) { ngx_queue_insert_head(&sp->free, &items[i].queue); items[i].socket_pool = sp; + items[i].on_push_cb_ref = LUA_NOREF; } *spool = sp; @@ -1003,6 +1004,7 @@ ngx_http_lua_socket_tcp_connect(lua_State *L) int key_index; ngx_int_t backlog; ngx_int_t pool_size; + int on_push_cb_ref; ngx_str_t key; const char *msg; @@ -1036,6 +1038,7 @@ ngx_http_lua_socket_tcp_connect(lua_State *L) key_index = 2; pool_size = 0; custom_pool = 0; + on_push_cb_ref = LUA_NOREF; llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); if (lua_type(L, n) == LUA_TTABLE) { @@ -1080,6 +1083,20 @@ ngx_http_lua_socket_tcp_connect(lua_State *L) lua_pop(L, 1); + lua_getfield(L, n, "on_push"); + + if (lua_isfunction(L, -1)) { + on_push_cb_ref = luaL_ref(L, LUA_REGISTRYINDEX); + } else { + if (!lua_isnil(L, -1)) { + msg = lua_pushfstring(L, "bad \"on_push\" option type: %s", + lua_typename(L, lua_type(L, -1))); + return luaL_argerror(L, n, msg); + } + + lua_pop(L, 1); + } + lua_getfield(L, n, "pool"); switch (lua_type(L, -1)) { @@ -1217,6 +1234,8 @@ ngx_http_lua_socket_tcp_connect(lua_State *L) ngx_memzero(u, sizeof(ngx_http_lua_socket_tcp_upstream_t)); + u->on_push_cb_ref = on_push_cb_ref; + u->request = r; /* set the controlling request */ u->conf = llcf; @@ -4524,12 +4543,22 @@ ngx_http_lua_socket_tcp_finalize(ngx_http_request_t *r, { ngx_connection_t *c; ngx_http_lua_socket_pool_t *spool; + lua_State *L; dd("request: %p, u: %p, u->cleanup: %p", r, u, u->cleanup); ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "lua finalize socket"); + if (u->on_push_cb_ref != LUA_NOREF) { + L = ngx_http_lua_get_lua_vm(r, NULL); + if (L != NULL) { + luaL_unref(L, LUA_REGISTRYINDEX, u->on_push_cb_ref); + } + u->on_push_cb_ref = LUA_NOREF; + } + + if (u->cleanup) { *u->cleanup = NULL; ngx_http_lua_cleanup_free(r, u->cleanup); @@ -5643,8 +5672,7 @@ ngx_http_lua_socket_tcp_setkeepalive(lua_State *L) /* When the server closes the connection, * epoll will return EPOLLRDHUP event and nginx will set pending_eof. */ - if (c == NULL || u->read_closed || u->write_closed - || c->read->eof || c->read->pending_eof) + if (c == NULL || u->read_closed || u->write_closed) { lua_pushnil(L); lua_pushliteral(L, "closed"); @@ -5740,6 +5768,14 @@ ngx_http_lua_socket_tcp_setkeepalive(lua_State *L) item = ngx_queue_data(q, ngx_http_lua_socket_pool_item_t, queue); + if (item->on_push_cb_ref != LUA_NOREF) { + lua_State *evict_L = spool->lua_vm; + if (evict_L != NULL) { + luaL_unref(evict_L, LUA_REGISTRYINDEX, item->on_push_cb_ref); + } + item->on_push_cb_ref = LUA_NOREF; + } + ngx_http_lua_socket_tcp_close_connection(item->connection); /* only decrease the counter for connections which were counted */ @@ -5766,6 +5802,9 @@ ngx_http_lua_socket_tcp_setkeepalive(lua_State *L) item->connection = c; ngx_queue_insert_head(&spool->cache, q); + item->on_push_cb_ref = u->on_push_cb_ref; + u->on_push_cb_ref = LUA_NOREF; + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, "lua tcp socket clear current socket connection"); @@ -5983,6 +6022,71 @@ ngx_http_lua_socket_keepalive_close_handler(ngx_event_t *ev) return NGX_OK; } + item = c->data; + + if (n > 0 && item->on_push_cb_ref != LUA_NOREF) { + char rbuf[4096]; + ssize_t nread; + lua_State *L; + int close_conn; + int nret; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0, + "lua tcp socket keepalive: data received, calling cb"); + + spool = item->socket_pool; + L = spool->lua_vm; + if (L == NULL) { + goto close; + } + + rbuf[0] = buf[0]; + + /* read the available data into a stack buffer; no request pool needed */ + nread = c->recv(c, rbuf + 1, sizeof(rbuf) - 1); + if (nread <= 0) { + goto close; + } + + lua_rawgeti(L, LUA_REGISTRYINDEX, item->on_push_cb_ref); + lua_pushlstring(L, rbuf, (size_t) nread); + + /* callback(data) -> reply:string|nil, close:bool */ + if (lua_pcall(L, 1, 2, 0) != LUA_OK) { + ngx_log_error(NGX_LOG_ERR, ev->log, 0, + "lua tcp socket keepalive callback error: %s", + lua_tostring(L, -1)); + lua_pop(L, 1); + goto close; + } + + /* stack: reply(-2), close(-1) */ + if (lua_type(L, -2) == LUA_TSTRING) { + size_t slen; + const char *sdata = lua_tolstring(L, -2, &slen); + ssize_t nsent = 0; + + while ((size_t) nsent < slen) { + ssize_t w = c->send(c, sdata + nsent, slen - nsent, 0); + if (w <= 0) { + lua_pop(L, 2); + goto close; + } + nsent += w; + } + } + + close_conn = !lua_toboolean(L, -1); + lua_pop(L, 2); + + if (!close_conn) { + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + goto close; + } + return NGX_OK; + } + } + close: ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, 0, @@ -5991,6 +6095,14 @@ ngx_http_lua_socket_keepalive_close_handler(ngx_event_t *ev) item = c->data; spool = item->socket_pool; + if (item->on_push_cb_ref != LUA_NOREF) { + lua_State *close_L = spool->lua_vm; + if (close_L != NULL) { + luaL_unref(close_L, LUA_REGISTRYINDEX, item->on_push_cb_ref); + } + item->on_push_cb_ref = LUA_NOREF; + } + ngx_http_lua_socket_tcp_close_connection(c); ngx_queue_remove(&item->queue); @@ -6042,6 +6154,15 @@ ngx_http_lua_socket_shutdown_pool_helper(ngx_http_lua_socket_pool_t *spool) q = ngx_queue_head(&spool->cache); item = ngx_queue_data(q, ngx_http_lua_socket_pool_item_t, queue); + + if (item->on_push_cb_ref != LUA_NOREF) { + lua_State *close_L = spool->lua_vm; + if (close_L != NULL) { + luaL_unref(close_L, LUA_REGISTRYINDEX, item->on_push_cb_ref); + } + item->on_push_cb_ref = LUA_NOREF; + } + c = item->connection; ngx_http_lua_socket_tcp_close_connection(c); diff --git a/src/ngx_http_lua_socket_tcp.h b/src/ngx_http_lua_socket_tcp.h index 6cc6fc5876..8abd0ee5e6 100644 --- a/src/ngx_http_lua_socket_tcp.h +++ b/src/ngx_http_lua_socket_tcp.h @@ -123,6 +123,8 @@ struct ngx_http_lua_socket_tcp_upstream_s { ngx_http_lua_co_ctx_t *read_co_ctx; ngx_http_lua_co_ctx_t *write_co_ctx; + int on_push_cb_ref; + ngx_uint_t reused; struct sockaddr_storage sockaddr; socklen_t socklen; @@ -189,6 +191,7 @@ typedef struct { char host[COSOCKET_HOST_LEN]; ngx_uint_t reused; + int on_push_cb_ref; ngx_http_lua_socket_udata_queue_t *udata_queue; } ngx_http_lua_socket_pool_item_t;