From 6c4b5f8624fdcf67ddee7995e2ca3691c4ca77e1 Mon Sep 17 00:00:00 2001 From: Alban Crequy Date: Tue, 8 Jul 2014 16:26:21 +0100 Subject: [PATCH] pending replies: use better data structures than a linked list BusExpireList used to keep track of BusExpireItems/BusPendingReply in a unique linked list without knowledge of the BusPendingReply struct. Iterating on the linked list takes a linear time on the number of items. It is inefficient because it iterates on items not relevant to the DBusConnection considered. The total number of BusExpireItems on the bus can be quite large: with the default limits on the system bus, it is 2097152 per user: max_replies_per_connection * max_connections_per_user = (1024 * 8) * 256 The items in the BusExpireList are looked up in 3 different ways: 1. When sending a D-Bus method call in bus_connections_expect_reply(): check whether the exact triplet (DBusConnection caller, DBusConnection callee, dbus_uint32_t reply_serial) already exists and check whether the number of pending replies associated to the caller is over max_replies_per_connection. 2. When sending a D-Bus return call in bus_connections_check_reply(): find the exact triplet (DBusConnection caller, DBusConnection callee, dbus_uint32_t reply_serial) 3. When a DBusConnection disconnects, iterate on the pending replies associated either with the connection as a caller or as a callee. It becomes overly complicated to make a better data structure for theses use cases without BusExpireItem having a reference to the caller, callee and serial. BusExpireList was written in a generic way but is only used for pending replies. This patch gets rid of the generic programming and store each BusExpireItem in two hash tables keyed respectively by DBusConnection caller and by DBusConnection callee. In order to implement transactions correctly, a pending reply needs to be removed and re-added in the BusExpireList without memory allocations. It was done with linked lists by keeping the reference to the DBusList in CheckPendingReplyData. In this new implementation, BusExpireItem has a new field "dbus_bool_t deleted" and the BusExpireItem is only marked as deleted when preparing the transaction and removed effectively when the transaction finishes, with the bus_transaction_add_cancel_hook callbacks. https://bugs.freedesktop.org/show_bug.cgi?id=81053 --- bus/connection.c | 223 +++++++++++-------------- bus/expirelist.c | 483 +++++++++++++++++++++++++++++++++++++++++++++++-------- bus/expirelist.h | 39 +++-- 3 files changed, 532 insertions(+), 213 deletions(-) diff --git a/bus/connection.c b/bus/connection.c index ea2d155..1e61df8 100644 --- a/bus/connection.c +++ b/bus/connection.c @@ -39,17 +39,6 @@ static void bus_connection_remove_transactions (DBusConnection *connection); -typedef struct -{ - BusExpireItem expire_item; - - DBusConnection *will_get_reply; - DBusConnection *will_send_reply; - - dbus_uint32_t reply_serial; - -} BusPendingReply; - struct BusConnections { int refcount; @@ -105,7 +94,7 @@ typedef struct } BusConnectionData; static dbus_bool_t bus_pending_reply_expired (BusExpireList *list, - DBusList *link, + BusExpireItem *pending, void *data); static void bus_connection_drop_pending_replies (BusConnections *connections, @@ -1517,7 +1506,7 @@ bus_connections_check_limits (BusConnections *connections, } static void -bus_pending_reply_free (BusPendingReply *pending) +bus_pending_reply_free (BusExpireItem *pending) { _dbus_verbose ("Freeing pending reply %p, replier %p receiver %p serial %u\n", pending, @@ -1531,7 +1520,7 @@ bus_pending_reply_free (BusPendingReply *pending) static dbus_bool_t bus_pending_reply_send_no_reply (BusConnections *connections, BusTransaction *transaction, - BusPendingReply *pending) + BusExpireItem *pending) { DBusMessage *message; DBusMessageIter iter; @@ -1572,10 +1561,9 @@ bus_pending_reply_send_no_reply (BusConnections *connections, static dbus_bool_t bus_pending_reply_expired (BusExpireList *list, - DBusList *link, + BusExpireItem *pending, void *data) { - BusPendingReply *pending = link->data; BusConnections *connections = data; BusTransaction *transaction; @@ -1602,14 +1590,59 @@ bus_pending_reply_expired (BusExpireList *list, return FALSE; } - bus_expire_list_remove_link (connections->pending_replies, link); + bus_expire_list_mark_deleted (connections->pending_replies, pending); - bus_pending_reply_free (pending); bus_transaction_execute_and_free (transaction); return TRUE; } +static dbus_bool_t +drop_caller_cb (BusExpireList *list, + BusExpireItem *pending, + void *data) +{ + BusConnections *connections = data; + + /* We don't need to track this pending reply anymore */ + + _dbus_verbose ("Dropping pending reply %p, replier %p receiver %p serial %u\n", + pending, + pending->will_send_reply, + pending->will_get_reply, + pending->reply_serial); + + bus_expire_list_mark_deleted (connections->pending_replies, pending); + bus_expire_list_recheck_immediately (connections->pending_replies); + + return TRUE; +} + +static dbus_bool_t +drop_callee_cb (BusExpireList *list, + BusExpireItem *pending, + void *data) +{ + BusConnections *connections = data; + + /* The reply isn't going to be sent, so set things + * up so it will be expired right away + */ + _dbus_verbose ("Will expire pending reply %p, replier %p receiver %p serial %u\n", + pending, + pending->will_send_reply, + pending->will_get_reply, + pending->reply_serial); + + pending->will_send_reply = NULL; + pending->added_tv_sec = 0; + pending->added_tv_usec = 0; + + bus_expire_list_recheck_immediately (connections->pending_replies); + + return TRUE; +} + static void bus_connection_drop_pending_replies (BusConnections *connections, DBusConnection *connection) @@ -1617,61 +1650,17 @@ bus_connection_drop_pending_replies (BusConnections *connections, /* The DBusConnection is almost 100% finalized here, so you can't * do anything with it except check for pointer equality */ - DBusList *link; - _dbus_verbose ("Dropping pending replies that involve connection %p\n", connection); - - link = bus_expire_list_get_first_link (connections->pending_replies); - while (link != NULL) - { - DBusList *next; - BusPendingReply *pending; - - next = bus_expire_list_get_next_link (connections->pending_replies, - link); - pending = link->data; - - if (pending->will_get_reply == connection) - { - /* We don't need to track this pending reply anymore */ - - _dbus_verbose ("Dropping pending reply %p, replier %p receiver %p serial %u\n", - pending, - pending->will_send_reply, - pending->will_get_reply, - pending->reply_serial); - - bus_expire_list_remove_link (connections->pending_replies, - link); - bus_pending_reply_free (pending); - } - else if (pending->will_send_reply == connection) - { - /* The reply isn't going to be sent, so set things - * up so it will be expired right away - */ - _dbus_verbose ("Will expire pending reply %p, replier %p receiver %p serial %u\n", - pending, - pending->will_send_reply, - pending->will_get_reply, - pending->reply_serial); - - pending->will_send_reply = NULL; - pending->expire_item.added_tv_sec = 0; - pending->expire_item.added_tv_usec = 0; - bus_expire_list_recheck_immediately (connections->pending_replies); - } - - link = next; - } + bus_expire_list_foreach (connections->pending_replies, connection, + drop_caller_cb, drop_callee_cb, connections); } typedef struct { - BusPendingReply *pending; + BusExpireItem *pending; BusConnections *connections; } CancelPendingReplyData; @@ -1683,7 +1672,7 @@ cancel_pending_reply (void *data) _dbus_verbose ("d = %p\n", d); if (!bus_expire_list_remove (d->connections->pending_replies, - &d->pending->expire_item)) + d->pending)) _dbus_assert_not_reached ("pending reply did not exist to be cancelled"); bus_pending_reply_free (d->pending); /* since it's been cancelled */ @@ -1715,11 +1704,9 @@ bus_connections_expect_reply (BusConnections *connections, DBusMessage *reply_to_this, DBusError *error) { - BusPendingReply *pending; + BusExpireItem *pending; dbus_uint32_t reply_serial; - DBusList *link; CancelPendingReplyData *cprd; - int count; _dbus_assert (will_get_reply != NULL); _dbus_assert (will_send_reply != NULL); @@ -1730,28 +1717,18 @@ bus_connections_expect_reply (BusConnections *connections, reply_serial = dbus_message_get_serial (reply_to_this); - link = bus_expire_list_get_first_link (connections->pending_replies); - count = 0; - while (link != NULL) + pending = bus_expire_list_lookup (connections->pending_replies, + will_get_reply, will_send_reply, + reply_serial); + if (pending) { - pending = link->data; - - if (pending->reply_serial == reply_serial && - pending->will_get_reply == will_get_reply && - pending->will_send_reply == will_send_reply) - { - dbus_set_error (error, DBUS_ERROR_ACCESS_DENIED, - "Message has the same reply serial as a currently-outstanding existing method call"); - return FALSE; - } - - link = bus_expire_list_get_next_link (connections->pending_replies, - link); - if (pending->will_get_reply == will_get_reply) - ++count; + dbus_set_error (error, DBUS_ERROR_ACCESS_DENIED, + "Message has the same reply serial as a currently-outstanding existing method call"); + return FALSE; } - - if (count >= + + if (bus_expire_list_item_count_per_caller (connections->pending_replies, + will_get_reply) >= bus_context_get_max_replies_per_connection (connections->context)) { dbus_set_error (error, DBUS_ERROR_LIMITS_EXCEEDED, @@ -1759,7 +1736,7 @@ bus_connections_expect_reply (BusConnections *connections, return FALSE; } - pending = dbus_new0 (BusPendingReply, 1); + pending = dbus_new0 (BusExpireItem, 1); if (pending == NULL) { BUS_SET_OOM (error); @@ -1768,8 +1745,8 @@ bus_connections_expect_reply (BusConnections *connections, #ifdef DBUS_ENABLE_VERBOSE_MODE /* so we can see a not-yet-added pending reply */ - pending->expire_item.added_tv_sec = 1; - pending->expire_item.added_tv_usec = 1; + pending->added_tv_sec = 1; + pending->added_tv_usec = 1; #endif pending->will_get_reply = will_get_reply; @@ -1785,7 +1762,7 @@ bus_connections_expect_reply (BusConnections *connections, } if (!bus_expire_list_add (connections->pending_replies, - &pending->expire_item)) + pending)) { BUS_SET_OOM (error); dbus_free (cprd); @@ -1799,7 +1776,7 @@ bus_connections_expect_reply (BusConnections *connections, cancel_pending_reply_data_free)) { BUS_SET_OOM (error); - bus_expire_list_remove (connections->pending_replies, &pending->expire_item); + bus_expire_list_remove (connections->pending_replies, pending); dbus_free (cprd); bus_pending_reply_free (pending); return FALSE; @@ -1808,8 +1785,8 @@ bus_connections_expect_reply (BusConnections *connections, cprd->pending = pending; cprd->connections = connections; - _dbus_get_monotonic_time (&pending->expire_item.added_tv_sec, - &pending->expire_item.added_tv_usec); + _dbus_get_monotonic_time (&pending->added_tv_sec, + &pending->added_tv_usec); _dbus_verbose ("Added pending reply %p, replier %p receiver %p serial %u\n", pending, @@ -1822,45 +1799,42 @@ bus_connections_expect_reply (BusConnections *connections, typedef struct { - DBusList *link; - BusConnections *connections; + BusExpireItem *item; + BusConnections *connections; } CheckPendingReplyData; static void cancel_check_pending_reply (void *data) { CheckPendingReplyData *d = data; + BusExpireItem *item = d->item; _dbus_verbose ("d = %p\n",d); - bus_expire_list_add_link (d->connections->pending_replies, - d->link); - d->link = NULL; + bus_expire_list_restore (d->connections->pending_replies, + item); + d->item = NULL; } static void check_pending_reply_data_free (void *data) { CheckPendingReplyData *d = data; + BusExpireItem *pending = d->item; _dbus_verbose ("d = %p\n",d); - if (d->link != NULL) + if (pending != NULL) { - BusPendingReply *pending = d->link->data; - - _dbus_assert (!bus_expire_list_contains_item (d->connections->pending_replies, - &pending->expire_item)); - + bus_expire_list_remove (d->connections->pending_replies, pending); bus_pending_reply_free (pending); - _dbus_list_free_link (d->link); } dbus_free (d); } /* - * Check whether a reply is allowed, remove BusPendingReply + * Check whether a reply is allowed, remove BusExpireItem * if so, return TRUE if so. */ dbus_bool_t @@ -1872,7 +1846,7 @@ bus_connections_check_reply (BusConnections *connections, DBusError *error) { CheckPendingReplyData *cprd; - DBusList *link; + BusExpireItem *pending; dbus_uint32_t reply_serial; _dbus_assert (sending_reply != NULL); @@ -1880,24 +1854,14 @@ bus_connections_check_reply (BusConnections *connections, reply_serial = dbus_message_get_reply_serial (reply); - link = bus_expire_list_get_first_link (connections->pending_replies); - while (link != NULL) + pending = bus_expire_list_lookup (connections->pending_replies, + receiving_reply, sending_reply, + reply_serial); + if (pending) { - BusPendingReply *pending = link->data; - - if (pending->reply_serial == reply_serial && - pending->will_get_reply == receiving_reply && - pending->will_send_reply == sending_reply) - { - _dbus_verbose ("Found pending reply with serial %u\n", reply_serial); - break; - } - - link = bus_expire_list_get_next_link (connections->pending_replies, - link); + _dbus_verbose ("Found pending reply with serial %u\n", reply_serial); } - - if (link == NULL) + else { _dbus_verbose ("No pending reply expected\n"); @@ -1921,14 +1885,11 @@ bus_connections_check_reply (BusConnections *connections, return FALSE; } - cprd->link = link; + cprd->item = pending; cprd->connections = connections; - bus_expire_list_unlink (connections->pending_replies, - link); + bus_expire_list_mark_deleted (connections->pending_replies, cprd->item); - _dbus_assert (!bus_expire_list_contains_item (connections->pending_replies, link->data)); - return TRUE; } diff --git a/bus/expirelist.c b/bus/expirelist.c index 9a3886e..4ed29c1 100644 --- a/bus/expirelist.c +++ b/bus/expirelist.c @@ -27,10 +27,41 @@ #include #include #include +#include struct BusExpireList { - DBusList *items; /**< List of BusExpireItem */ + /* hash: + * - key: DBusConnection *caller + * - value: hash: + * - key: DBusConnection *callee + * - value: hash: + * - key: int reply_serial + * - value: BusExpireItem *item + */ + DBusHashTable *caller; + + /* hash: + * - key: DBusConnection *callee + * - value: hash: + * - key: DBusConnection *caller + * - value: hash: + * - key: int reply_serial + * - value: BusExpireItem *item + */ + DBusHashTable *callee; + + /* hash: + * - key: DBusConnection *caller + * - value: dbus_uint32_t count + * + * Useful to implement max_replies_per_connection + */ + DBusHashTable *item_count_per_caller; + + int deleted_items_count; + int non_deleted_items_count; + DBusTimeout *timeout; DBusLoop *loop; BusExpireFunc expire_func; @@ -40,6 +71,14 @@ struct BusExpireList static dbus_bool_t expire_timeout_handler (void *data); +static void +_safe_hash_unref (DBusHashTable *table) +{ + if (table) + _dbus_hash_table_unref (table); +} + + BusExpireList* bus_expire_list_new (DBusLoop *loop, int expire_after, @@ -57,6 +96,13 @@ bus_expire_list_new (DBusLoop *loop, list->loop = loop; list->expire_after = expire_after; + list->caller = _dbus_hash_table_new (DBUS_HASH_UINTPTR, + NULL, (DBusFreeFunction) _safe_hash_unref); + list->callee = _dbus_hash_table_new (DBUS_HASH_UINTPTR, + NULL, (DBusFreeFunction) _safe_hash_unref); + list->item_count_per_caller = _dbus_hash_table_new (DBUS_HASH_UINTPTR, + NULL, NULL); + list->timeout = _dbus_timeout_new (100, /* irrelevant */ expire_timeout_handler, list, NULL); @@ -82,7 +128,9 @@ bus_expire_list_new (DBusLoop *loop, void bus_expire_list_free (BusExpireList *list) { - _dbus_assert (list->items == NULL); + _dbus_hash_table_unref (list->caller); + _dbus_hash_table_unref (list->callee); + _dbus_hash_table_unref (list->item_count_per_caller); _dbus_loop_remove_timeout (list->loop, list->timeout); @@ -122,64 +170,126 @@ bus_expire_list_recheck_immediately (BusExpireList *list) bus_expire_timeout_set_interval (list->timeout, 0); } +static void +_bus_expire_list_cleanup_helper (BusExpireList *list, + DBusHashTable *hash1, + DBusFreeFunction f) +{ + DBusHashIter iter1; + + _dbus_hash_iter_init (hash1, &iter1); + while (_dbus_hash_iter_next (&iter1)) + { + DBusHashTable *hash2 = _dbus_hash_iter_get_value (&iter1); + DBusHashIter iter2; + _dbus_hash_iter_init (hash2, &iter2); + while (_dbus_hash_iter_next (&iter2)) + { + DBusHashTable *hash3 = _dbus_hash_iter_get_value (&iter2); + DBusHashIter iter3; + _dbus_hash_iter_init (hash3, &iter3); + while (_dbus_hash_iter_next (&iter3)) + { + BusExpireItem *item = _dbus_hash_iter_get_value (&iter3); + + if (!item->deleted) + continue; + + /* It is safe to remove the entry while we walk the iterator + * but not to remove iter2 while we are in the iter3 loop. + */ + if (f) + f (item); + _dbus_hash_iter_remove_entry (&iter3); + } + if (_dbus_hash_table_get_n_entries (hash3) == 0) + { + _dbus_hash_iter_remove_entry (&iter2); + } + } + if (_dbus_hash_table_get_n_entries (hash2) == 0) + { + _dbus_hash_iter_remove_entry (&iter1); + } + } + +} + static int do_expiration_with_monotonic_time (BusExpireList *list, long tv_sec, long tv_usec) { - DBusList *link; + DBusHashIter iter1; int next_interval, min_wait_time, items_to_expire; next_interval = -1; min_wait_time = 3600 * 1000; /* this is reset anyway if used */ items_to_expire = 0; - - link = _dbus_list_get_first_link (&list->items); - while (link != NULL) - { - DBusList *next = _dbus_list_get_next_link (&list->items, link); - double elapsed; - BusExpireItem *item; - - item = link->data; - - elapsed = ELAPSED_MILLISECONDS_SINCE (item->added_tv_sec, - item->added_tv_usec, - tv_sec, tv_usec); - if (((item->added_tv_sec == 0) && (item->added_tv_usec == 0)) || - ((list->expire_after > 0) && (elapsed >= (double) list->expire_after))) + _dbus_hash_iter_init (list->caller, &iter1); + while (_dbus_hash_iter_next (&iter1)) + { + DBusHashTable *hash2 = _dbus_hash_iter_get_value (&iter1); + DBusHashIter iter2; + _dbus_hash_iter_init (hash2, &iter2); + while (_dbus_hash_iter_next (&iter2)) { - _dbus_verbose ("Expiring an item %p\n", item); - - /* If the expire function fails, we just end up expiring - * this item next time we walk through the list. This would - * be an indeterminate time normally, so we set up the - * next_interval to be "shortly" (just enough to avoid - * a busy loop) - */ - if (!(* list->expire_func) (list, link, list->data)) + DBusHashTable *hash3 = _dbus_hash_iter_get_value (&iter2); + DBusHashIter iter3; + _dbus_hash_iter_init (hash3, &iter3); + while (_dbus_hash_iter_next (&iter3)) { - next_interval = _dbus_get_oom_wait (); - break; + BusExpireItem *item = _dbus_hash_iter_get_value (&iter3); + double elapsed; + + if (item->deleted) + continue; + + elapsed = ELAPSED_MILLISECONDS_SINCE (item->added_tv_sec, + item->added_tv_usec, + tv_sec, tv_usec); + + if (((item->added_tv_sec == 0) && (item->added_tv_usec == 0)) || + ((list->expire_after > 0) && (elapsed >= (double) list->expire_after))) + { + _dbus_verbose ("Expiring an item %p\n", item); + + /* If the expire function fails, we just end up expiring + * this item next time we walk through the list. This would + * be an indeterminate time normally, so we set up the + * next_interval to be "shortly" (just enough to avoid + * a busy loop) + */ + if (!(* list->expire_func) (list, item, list->data)) + { + next_interval = _dbus_get_oom_wait (); + break; + } + } + else if (list->expire_after > 0) + { + double to_wait; + + items_to_expire = 1; + to_wait = (double) list->expire_after - elapsed; + if (min_wait_time > to_wait) + min_wait_time = to_wait; + } } } - else if (list->expire_after > 0) - { - double to_wait; - - items_to_expire = 1; - to_wait = (double) list->expire_after - elapsed; - if (min_wait_time > to_wait) - min_wait_time = to_wait; - } - - link = next; } if (next_interval < 0 && items_to_expire) next_interval = min_wait_time; + if (list->deleted_items_count == 0) + return next_interval; + + _bus_expire_list_cleanup_helper (list, list->caller, NULL); + _bus_expire_list_cleanup_helper (list, list->callee, dbus_free); + list->deleted_items_count = 0; + return next_interval; } @@ -190,7 +300,7 @@ bus_expirelist_expire (BusExpireList *list) next_interval = -1; - if (list->items != NULL) + if (list->non_deleted_items_count > 0) { long tv_sec, tv_usec; @@ -215,25 +325,153 @@ expire_timeout_handler (void *data) return TRUE; } -void -bus_expire_list_remove_link (BusExpireList *list, - DBusList *link) +static dbus_bool_t +_bus_expire_list_remove_helper (DBusHashTable *hash1, + void *key1, void *key2, + BusExpireItem *item) { - _dbus_list_remove_link (&list->items, link); + DBusHashIter iter1; + DBusHashIter iter2; + DBusHashIter iter3; + DBusHashTable *hash2; + DBusHashTable *hash3; + dbus_bool_t found; + + found = _dbus_hash_iter_lookup (hash1, key1, + FALSE, &iter1); + if (!found) + return FALSE; + + hash2 = _dbus_hash_iter_get_value (&iter1); + found = _dbus_hash_iter_lookup (hash2, key2, + FALSE, &iter2); + if (!found) + return FALSE; + + hash3 = _dbus_hash_iter_get_value (&iter2); + found = _dbus_hash_iter_lookup (hash3, + _DBUS_INT_TO_POINTER (item->reply_serial), + FALSE, &iter3); + if (!found) + return FALSE; + + _dbus_assert (item == _dbus_hash_iter_get_value (&iter3)); + + _dbus_hash_iter_remove_entry (&iter3); + if (_dbus_hash_table_get_n_entries (hash3) == 0) + _dbus_hash_iter_remove_entry (&iter2); + if (_dbus_hash_table_get_n_entries (hash2) == 0) + _dbus_hash_iter_remove_entry (&iter1); + + return TRUE; } dbus_bool_t bus_expire_list_remove (BusExpireList *list, BusExpireItem *item) { - return _dbus_list_remove (&list->items, item); + DBusHashIter iter; + dbus_bool_t found; + int value; + + found = _bus_expire_list_remove_helper (list->caller, item->will_get_reply, + item->will_send_reply, item); + if (!found) + return FALSE; + + found = _bus_expire_list_remove_helper (list->callee, item->will_send_reply, + item->will_get_reply, item); + _dbus_assert (found); + + found = _dbus_hash_iter_lookup (list->item_count_per_caller, + item->will_get_reply, + FALSE, &iter); + _dbus_assert (found); + value = _DBUS_POINTER_TO_INT (_dbus_hash_iter_get_value (&iter)); + value += 1; + _dbus_hash_iter_set_value (&iter, _DBUS_INT_TO_POINTER (value)); + + return TRUE; } void -bus_expire_list_unlink (BusExpireList *list, - DBusList *link) +bus_expire_list_mark_deleted (BusExpireList *list, + BusExpireItem *item) +{ + _dbus_assert (item != NULL); + _dbus_assert (!item->deleted); + + item->deleted = TRUE; + list->deleted_items_count += 1; +} + +static dbus_bool_t +_bus_expire_list_add_helper (DBusHashTable *hash1, + void *key1, void *key2, + BusExpireItem *item, + DBusHashIter *rollback_iter) { - _dbus_list_unlink (&list->items, link); + DBusHashIter iter1; + DBusHashIter iter2; + DBusHashIter iter3; + DBusHashTable *hash2; + DBusHashTable *hash3; + dbus_bool_t found, ret; + + _dbus_assert (hash1); + found = _dbus_hash_iter_lookup (hash1, key1, + FALSE, &iter1); + if (found) + { + hash2 = _dbus_hash_iter_get_value (&iter1); + _dbus_assert (hash2); + } + else + { + hash2 = _dbus_hash_table_new (DBUS_HASH_UINTPTR, + NULL, (DBusFreeFunction) _safe_hash_unref); + if (!hash2) + goto oom; + ret = _dbus_hash_table_insert_uintptr (hash1, + _DBUS_POINTER_TO_INT (key1), hash2); + if (!ret) + goto oom; + } + + found = _dbus_hash_iter_lookup (hash2, key2, + FALSE, &iter2); + if (found) + { + hash3 = _dbus_hash_iter_get_value (&iter2); + _dbus_assert (hash3); + } + else + { + hash3 = _dbus_hash_table_new (DBUS_HASH_INT, NULL, NULL); + if (!hash3) + goto oom; + ret = _dbus_hash_table_insert_uintptr (hash2, + _DBUS_POINTER_TO_INT (key2), hash3); + if (!ret) + goto oom; + } + + found = _dbus_hash_iter_lookup (hash3, + _DBUS_INT_TO_POINTER (item->reply_serial), FALSE, &iter3); + _dbus_assert (!found); + + ret = _dbus_hash_table_insert_int (hash3, item->reply_serial, item); + + /* Make it easier to rollback the insertion */ + if (ret && rollback_iter) + _dbus_hash_iter_lookup (hash3, _DBUS_INT_TO_POINTER (item->reply_serial), + FALSE, rollback_iter); + + return ret; + +oom: + /* Empty hash table are not harmful */ + return FALSE; } dbus_bool_t @@ -241,44 +479,149 @@ bus_expire_list_add (BusExpireList *list, BusExpireItem *item) { dbus_bool_t ret; + DBusHashIter rollback_iter1, rollback_iter2; + int value; + + _dbus_assert (item != NULL); + _dbus_assert (!item->deleted); + + ret = _bus_expire_list_add_helper (list->caller, item->will_get_reply, + item->will_send_reply, item, + &rollback_iter1); + if (!ret) + return FALSE; + + ret = _bus_expire_list_add_helper (list->callee, item->will_send_reply, + item->will_get_reply, item, + &rollback_iter2); + if (!ret) + { + _dbus_hash_iter_remove_entry (&rollback_iter1); + return FALSE; + } + + value = _DBUS_POINTER_TO_INT (_dbus_hash_table_lookup_uintptr (list->item_count_per_caller, + _DBUS_POINTER_TO_INT (item->will_get_reply))); + value += 1; + ret = _dbus_hash_table_insert_uintptr (list->item_count_per_caller, + _DBUS_POINTER_TO_INT (item->will_get_reply), + _DBUS_INT_TO_POINTER (value)); + if (!ret) + { + _dbus_hash_iter_remove_entry (&rollback_iter1); + _dbus_hash_iter_remove_entry (&rollback_iter2); + return FALSE; + } - ret = _dbus_list_prepend (&list->items, item); - if (ret && !dbus_timeout_get_enabled (list->timeout)) + list->non_deleted_items_count += 1; + + if (!dbus_timeout_get_enabled (list->timeout)) bus_expire_timeout_set_interval (list->timeout, 0); - return ret; + return TRUE; } void -bus_expire_list_add_link (BusExpireList *list, - DBusList *link) +bus_expire_list_restore (BusExpireList *list, + BusExpireItem *item) { - _dbus_assert (link->data != NULL); - - _dbus_list_prepend_link (&list->items, link); + _dbus_assert (item != NULL); + _dbus_assert (item->deleted); + + item->deleted = FALSE; + + list->non_deleted_items_count -= 1; + list->deleted_items_count += 1; if (!dbus_timeout_get_enabled (list->timeout)) bus_expire_timeout_set_interval (list->timeout, 0); } -DBusList* -bus_expire_list_get_first_link (BusExpireList *list) +int +bus_expire_list_item_count_per_caller (BusExpireList *list, + DBusConnection *caller) { - return _dbus_list_get_first_link (&list->items); + void *value; + + value = _dbus_hash_table_lookup_uintptr (list->item_count_per_caller, + _DBUS_POINTER_TO_INT (caller)); + return _DBUS_POINTER_TO_INT (value); } -DBusList* -bus_expire_list_get_next_link (BusExpireList *list, - DBusList *link) +void +bus_expire_list_foreach (BusExpireList *list, + DBusConnection *conn, + BusExpireFunc caller_func, + BusExpireFunc callee_func, + void *data) { - return _dbus_list_get_next_link (&list->items, link); + DBusHashIter iter1; + + if (caller_func && _dbus_hash_iter_lookup (list->caller, conn, FALSE, &iter1)) + { + DBusHashTable *hash2 = _dbus_hash_iter_get_value (&iter1); + DBusHashIter iter2; + _dbus_hash_iter_init (hash2, &iter2); + while (_dbus_hash_iter_next (&iter2)) + { + DBusHashTable *hash3 = _dbus_hash_iter_get_value (&iter2); + DBusHashIter iter3; + _dbus_hash_iter_init (hash3, &iter3); + while (_dbus_hash_iter_next (&iter3)) + { + BusExpireItem *item = _dbus_hash_iter_get_value (&iter3); + if (item->deleted) + continue; + caller_func (list, item, data); + } + } + } + + if (callee_func && _dbus_hash_iter_lookup (list->caller, conn, FALSE, &iter1)) + { + DBusHashTable *hash2 = _dbus_hash_iter_get_value (&iter1); + DBusHashIter iter2; + _dbus_hash_iter_init (hash2, &iter2); + while (_dbus_hash_iter_next (&iter2)) + { + DBusHashTable *hash3 = _dbus_hash_iter_get_value (&iter2); + DBusHashIter iter3; + _dbus_hash_iter_init (hash3, &iter3); + while (_dbus_hash_iter_next (&iter3)) + { + BusExpireItem *item = _dbus_hash_iter_get_value (&iter3); + if (item->deleted) + continue; + callee_func (list, item, data); + } + } + } } -dbus_bool_t -bus_expire_list_contains_item (BusExpireList *list, - BusExpireItem *item) +BusExpireItem* +bus_expire_list_lookup (BusExpireList *list, + DBusConnection *caller, + DBusConnection *callee, + dbus_uint32_t reply_serial) { - return _dbus_list_find_last (&list->items, item) != NULL; + DBusHashTable *hash; + BusExpireItem *item; + + hash = _dbus_hash_table_lookup_uintptr (list->caller, + _DBUS_POINTER_TO_INT (caller)); + if (!hash) + return NULL; + + hash = _dbus_hash_table_lookup_uintptr (hash, + _DBUS_POINTER_TO_INT (callee)); + if (!hash) + return NULL; + + item = _dbus_hash_table_lookup_int (hash, reply_serial); + if (!item || item->deleted) + return NULL; + + return item; } #ifdef DBUS_ENABLE_EMBEDDED_TESTS @@ -291,12 +634,12 @@ typedef struct static dbus_bool_t test_expire_func (BusExpireList *list, - DBusList *link, + BusExpireItem *item, void *data) { TestExpireItem *t; - t = (TestExpireItem*) link->data; + t = (TestExpireItem*) item; t->expire_count += 1; diff --git a/bus/expirelist.h b/bus/expirelist.h index 887cb97..182bf2d 100644 --- a/bus/expirelist.h +++ b/bus/expirelist.h @@ -27,20 +27,31 @@ #include #include #include +#include typedef struct BusExpireList BusExpireList; typedef struct BusExpireItem BusExpireItem; typedef dbus_bool_t (* BusExpireFunc) (BusExpireList *list, - DBusList *link, + BusExpireItem *item, void *data); -/* embed this in a child expire item struct */ struct BusExpireItem { long added_tv_sec; /**< Time we were added (seconds component) */ long added_tv_usec; /**< Time we were added (microsec component) */ + + BusExpireList *set; + + DBusConnection *will_get_reply; /**< caller */ + DBusConnection *will_send_reply; /**< callee */ + dbus_uint32_t reply_serial; + + /* Items are not immediately deleted. Instead, they are marked as deleted + * and when the transaction is executed or cancelled, the deletion is + * completed or rolled back. */ + dbus_bool_t deleted; }; BusExpireList* bus_expire_list_new (DBusLoop *loop, @@ -49,21 +60,25 @@ BusExpireList* bus_expire_list_new (DBusLoop *loop, void *data); void bus_expire_list_free (BusExpireList *list); void bus_expire_list_recheck_immediately (BusExpireList *list); -void bus_expire_list_remove_link (BusExpireList *list, - DBusList *link); dbus_bool_t bus_expire_list_remove (BusExpireList *list, BusExpireItem *item); -DBusList* bus_expire_list_get_first_link (BusExpireList *list); -DBusList* bus_expire_list_get_next_link (BusExpireList *list, - DBusList *link); +void bus_expire_list_mark_deleted (BusExpireList *list, + BusExpireItem *item); dbus_bool_t bus_expire_list_add (BusExpireList *list, BusExpireItem *item); -void bus_expire_list_add_link (BusExpireList *list, - DBusList *link); -dbus_bool_t bus_expire_list_contains_item (BusExpireList *list, +void bus_expire_list_restore (BusExpireList *list, BusExpireItem *item); -void bus_expire_list_unlink (BusExpireList *list, - DBusList *link); +int bus_expire_list_item_count_per_caller (BusExpireList *list, + DBusConnection *caller); +void bus_expire_list_foreach (BusExpireList *list, + DBusConnection *conn, + BusExpireFunc caller_func, + BusExpireFunc callee_func, + void *data); +BusExpireItem* bus_expire_list_lookup (BusExpireList *list, + DBusConnection *caller, + DBusConnection *callee, + dbus_uint32_t reply_serial); /* this macro and function are semi-related utility functions, not really part of the * BusExpireList API -- 1.8.5.3