//дк src/network-mysqld-proxy.c /** * connect to a backend * * @return * RET_SUCCESS - connected successfully * RET_ERROR_RETRY - connecting backend failed, call again to connect to another backend * RET_ERROR - no backends available, adds a ERR packet to the client queue */ NETWORK_MYSQLD_PLUGIN_PROTO(proxy_connect_server) { plugin_con_state *st = con->plugin_con_state; plugin_srv_state *g = st->global_state; guint min_connected_clients = G_MAXUINT; guint i; GTimeVal now; gboolean use_pooled_connection = FALSE; if (con->server) { int so_error = 0; socklen_t so_error_len = sizeof(so_error); //g_debug("%s.%d: called a 2nd time after a connect(),the %d backend: %s", // __FILE__,__LINE__,st->backend_ndx,con->server->addr.str); /** * we might get called a 2nd time after a connect() == EINPROGRESS */ if (getsockopt(con->server->fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_len)) { /* getsockopt failed */ g_critical("%s.%d: getsockopt(%s) failed: %s", __FILE__, __LINE__, con->server->addr.str, strerror(errno)); return RET_ERROR; } if (so_error==0) { if (st->backend->state != BACKEND_STATE_UP) { st->backend->state = BACKEND_STATE_UP; g_get_current_time(&(st->backend->state_since)); } con->state = CON_STATE_READ_HANDSHAKE; return RET_SUCCESS; } else { g_critical("%s.%d: connect(%s) failed: %s", __FILE__, __LINE__, con->server->addr.str, strerror(so_error)); st->backend->state = BACKEND_STATE_DOWN; g_get_current_time(&(st->backend->state_since)); backend_t *cur = g->backend_pool->pdata[st->backend_ndx]; cur->state == BACKEND_STATE_DOWN; g_get_current_time(&(cur->state_since)); //cur->connected_clients = 0; st->backend->connected_clients --; network_socket_free(con->server); con->server = NULL; //return } } st->backend = NULL; st->backend_ndx = -1; g_get_current_time(&now); if (now.tv_sec - g->backend_last_check.tv_sec > 1) { /* check once a second if we have to wakeup a connection */ for (i = 0; i < g->backend_pool->len; i++) { backend_t *cur = g->backend_pool->pdata[i]; if (cur->state != BACKEND_STATE_DOWN) continue; /* check if a backend is marked as down for more than 10 sec */ if (now.tv_sec - cur->state_since.tv_sec > 4) { g_debug("%s.%d: backend %s was down for more than 10 sec, waking it up", __FILE__, __LINE__, cur->addr.str); cur->state = BACKEND_STATE_UNKNOWN; cur->state_since = now; } } } switch (proxy_lua_connect_server(con)) { case PROXY_SEND_RESULT: /* we answered directly ... like denial ... * * for sure we have something in the send-queue * */ return RET_SUCCESS; case PROXY_NO_DECISION: /* just go on */ break; case PROXY_IGNORE_RESULT: use_pooled_connection = TRUE; break; default: g_error("%s.%d: ... ", __FILE__, __LINE__); break; } /** * if the current backend is down, ignore it */ //g_debug("%s.%d: st->backend_ndx = %d, g->backend_pool->len=%d",__FILE__,__LINE__,st->backend_ndx, g->backend_pool->len); if (st->backend_ndx >= 0 && st->backend_ndx < g->backend_pool->len) { backend_t *cur = g->backend_pool->pdata[st->backend_ndx]; if (cur->state == BACKEND_STATE_DOWN) { st->backend_ndx = -1; } } if (con->server && !use_pooled_connection) { gint bndx = st->backend_ndx; /* we already have a connection assigned, * but the script said we don't want to use it */ proxy_connection_pool_add_connection(con); st->backend_ndx = bndx; //g_debug("%s.%d: st->backend_ndx = %d ",__FILE__,__LINE__,st->backend_ndx); } //g_debug("%s.%d: st->backend_ndx = %d ",__FILE__,__LINE__,st->backend_ndx); if (st->backend_ndx < 0) { /** * we can choose between different back addresses * * prefer SQF (shorted queue first) to load all backends equally */ for (i = 0; i < g->backend_pool->len; i++) { backend_t *cur = g->backend_pool->pdata[i]; /** * skip backends which are down or not writable */ if (cur->state == BACKEND_STATE_DOWN || cur->type != BACKEND_TYPE_RW) continue; //choose the backend server with less connections,change the backend_ndx--- //g_debug("%s.%d: The backend: %s. connected clients: %d.", // __FILE__,__LINE__,cur->addr.str,cur->connected_clients); if (cur->connected_clients < min_connected_clients) { st->backend_ndx = i; min_connected_clients = cur->connected_clients; //g_debug("%s.%d: backend_ndx=%d,min_connected_clients=%d",__FILE__,__LINE__,st->backend_ndx,min_connected_clients); } } //g_debug("%s.%d: select the %d backend",__FILE__,__LINE__,st->backend_ndx); if (st->backend_ndx >= 0 && st->backend_ndx < g->backend_pool->len) { st->backend = g->backend_pool->pdata[st->backend_ndx]; } } else if (NULL == st->backend && st->backend_ndx >= 0 && st->backend_ndx < g->backend_pool->len) { st->backend = g->backend_pool->pdata[st->backend_ndx]; } if (NULL == st->backend) { network_mysqld_con_send_error(con->client, C("(proxy) all backends are down")); return RET_ERROR; } /** * check if we have a connection in the pool for this backend */ if (NULL == con->server) { int ioctlvar; con->server = network_socket_init(); con->server->addr = st->backend->addr; con->server->addr.str = g_strdup(st->backend->addr.str); //g_debug("%s.%d: Connect to %s, it has %d connected clients",__FILE__,__LINE__,st->backend->addr.str,st->backend->connected_clients); st->backend->connected_clients++; if (0 != network_mysqld_con_connect(con->server)) { if (errno == E_NET_INPROGRESS || errno == E_NET_WOULDBLOCK) { return RET_ERROR_RETRY; } g_message("%s.%d: connecting to backend (%s) failed, marking it as down for ...", __FILE__, __LINE__, con->server->addr.str); st->backend->state = BACKEND_STATE_DOWN; g_get_current_time(&(st->backend->state_since)); network_socket_free(con->server); con->server = NULL; return RET_ERROR_RETRY; } if (st->backend->state != BACKEND_STATE_UP) { st->backend->state = BACKEND_STATE_UP; g_get_current_time(&(st->backend->state_since)); } con->state = CON_STATE_READ_HANDSHAKE; } else { /** * send the old hand-shake packet */ /* remove the idle-handler from the socket */ network_queue_append(con->client->send_queue, con->server->auth_handshake_packet->str, con->server->auth_handshake_packet->len, 0); /* packet-id */ con->state = CON_STATE_SEND_HANDSHAKE; /** * connect_clients is already incremented */ } return RET_SUCCESS; }