@z This file was created by Aleksandar Donev as part of the Network Optimization project. Feel free to use any portion of it and contact me at donev@pa.msu.edu @x \Title{Skip-Lists with |findmin| extensions} \author{Aleksandar Donev} \date{February 2000} @*0 Module |Skip_Lists|. This module is an implementation of a probabilistic search data structure called a {\bf skip list}. Skip lists are sorted linked lists (ordered lists) in which search takes a logarithmic time. The implementation is not optimized in many respects, and is not finished either, but it has some points worth taking. The primary goal of the implementation is to provide the tools needed to manipulate a small collection (set) of skip lists which have size of the same order (and this size should be a large number, since the implementation has large overheads per list), under the operations of merging, concatentation, splitting, deletion, insertion and of course to provide efficient search operations. At present the splitting, merging and concatenation operations have not yet been implemented, but this can be done easily using the structures and macros developed here. The primary operations are |InsertInSkipLists|, |DeleteFromSkipList| and |SearchInSkipList|. All of these are generic procedures overloaded for different types of keys. One of the major optimizations performed here is the internal handling of memory allocation. Using the run-time system to allocate and deallocate the nodes one by one has severe overheads. Here I implement a paging system for node allocation, which is not quite optimized, but is rather efficient and a good example of what one needs to do in serious implementations. @ @f data _data @f block _block @a @@; @@; @@; @*1 Key Data Types. The primary component of a node is its key, which can be of different types here, and is boxed inside the derived type |_key_type|. Only single integer keys are used here, but the macros below allow one to implement other types easily. Another component of a node is the data associated with the node. This can be anything. If it is a pointer to an object, the list is {\em exogenous} and in Fortran 200x one can make these pointers generic and thus implement generic skip lists (but the keys are fixed and are overloaded with generic interfaces here, which makes the whole implementation fast). If the data is an actual object, then the list is {\endogenous} (but generic lists can not be implemented this way with this module). I decided to box the key and the data together into one type, and box the key itself into another type (to avoid naming troubles). All routines here have names after the type of key and are then overloaded generically: @ @m _DeclareSingleKeyType(_key_type, _type, _kind, initial_value, _Data, ...) @; _type(KIND=_kind), PARAMETER, PUBLIC :: huge_@e@&_key_type=HUGE(huge_@e@&_key_type) @; PUBLIC :: _key_type @; TYPE _key_type @; _type(KIND=_kind) :: my_value @; ENDTYPE @; PUBLIC :: Data_@e@&_key_type @; // A node has a key and maybe additional data TYPE Data_@e@&_key_type @; _TYPE(_key_type) :: my_key @; // The search key @e#!_Data(#.) @; // If we need additional values ENDTYPE @; @ @= MODULE Key_Types @; USE Precision @; IMPLICIT NONE @; PRIVATE @; _DeclareSingleKeyType(Key_i_wp, @E INTEGER, i_wp, huge_Key_i_wp, _Dummy) @; _DeclareSingleKeyType(Key_i_wp_r_wp, @E INTEGER, i_wp, huge_Key_i_wp, _Declare_r_wp, my_cost) @; END MODULE Key_Types @; @*1 Node Data Types. To make a node, other than its datum (key+other data), we need its next-in-the-list pointers. For a skip list a node has not one, but several next pointers, arranged in levels, and this type does not need to be stored explicitly (the algorithm never references a level higher then what was allocated during insertion of the node into the list). However, here I {\bf preallocate} the nodes and their next pointers, so each node has a fixed largest possible level |my_max_level| (the actual node level can be capped to a number smaller then this. This way each page can esentially support an infinite skip list, since the maximum level is not capped at all). The preallocated nodes will be organized in {\em pages}, whose allocation will be managed internally. So each node needs to store it's page number |my_page|. A typical declaration of a next field for a skip list would be: \\ |_TYPE(NodePointer), DIMENSION(:), _DYNAMIC :: my_nexts=_NULL| \\ where |_DYNAMIC| is either |POINTER| or |ALLOCATABLE| (in F2x), preferably the second one. This declaration makes coding very easy, but it is extremely memory inefficient, since along with the actual node pointers a whole array descriptor needs to be stored as well, which is more memory then usually needed for the pointers themselves. Therefore here I preallocate pages of node pointers and then each node gets a piece of this array starting at the index |my_next|. If Fortran had an equivalent to C's pointers, then one could make a declaration such as:\\ |_TYPE(NodePointer), DIMENSION(*), POINTER :: my_nexts=_NULL| \\ and then associate |my_nexts=>page_of_pointers[my_next:my_next+my_max_level]| and this would completely separate the allocation system from the skip-list implementation. However, Fortran does not offer such contiguous pointers and so here I use macros to make the implementation of the search routines independent of the allocation system. @ @m _DYNAMIC POINTER @m _NOT_NULL ASSOCIATED @f _DYNAMIC POINTER @f _NOT_NULL ASSOCIATED @ @m _DeclareNodeType(_key_type) @; PUBLIC :: Node_@e@&_key_type @; TYPE Node_@e@&_key_type @; _PRIVATE @; _TYPE(Data_@e@&_key_type) :: my_data @; // The key and data of the node INTEGER(KIND=i_byte) :: my_max_level=1, my_page=1 @; // The allocated level and page number INTEGER(KIND=i_wp) :: my_next=0 @; // Zero-index into the block of nexts ENDTYPE @; PUBLIC :: NodePointer_@e@&_key_type @; TYPE NodePointer_@e@&_key_type @; _PRIVATE @; _TYPE(Node_@e@&_key_type), POINTER :: my_node=_NULL @; ENDTYPE @; @ I will not explain the memory system in any detail. The main type |MemoryMapOfNodes| associates some memory to a given type of preallocated nodes (meaning nodes of a given type and with a given probability $p$ for the next pointers). This memory can then be shared by several skip-lists, and the number of skip-lists should be kept small if efficiency is desired. The memory is organized in pages, which contain preallocated blocks of nodes and next pointers, and pages will later be allocated and deallocated as needed: @ @m _DeclareMemoryType(_key_type) @; TYPE BlockOfNodes_@e@&_key_type @; _TYPE(Node_@e@&_key_type), DIMENSION(:), _DYNAMIC :: my_nodes=_NULL @; _TYPE(NodePointer_@e@&_key_type), DIMENSION(:), _DYNAMIC :: my_pointers=_NULL @; INTEGER(KIND=i_wp) :: my_n_nodes=0 @; // Counter ENDTYPE @; PUBLIC :: PageOfNodes_@e@&_key_type @; TYPE PageOfNodes_@e@&_key_type @; // For now each page will only have one block _PRIVATE @; _TYPE(BlockOfNodes_@e@&_key_type) :: my_block @; _TYPE(NodePointer_@e@&_key_type) :: my_free_node @; // Each page has its stack of free nodes INTEGER(KIND=i_byte) :: my_next=0 @; // Pages are linked in a stack ENDTYPE @; PUBLIC :: MemoryMapOfNodes_@e@&_key_type @; // The user needs access to these TYPE MemoryMapOfNodes_@e@&_key_type @; // A collection of memory pages _TYPE(PageOfNodes_@e@&_key_type), DIMENSION(:), _DYNAMIC :: my_pages=_NULL @; REAL(KIND=r_sp) :: my_p=0.25_r_sp, my_log_p=-1.386294_r_sp @; // The probability $p$ INTEGER(KIND=i_wp) :: my_page_size=100 @; // How many nodes per page INTEGER(KIND=i_byte) :: my_n_lists=0, my_max_lists=10 @; // How many lists will use this memory INTEGER(KIND=i_byte) :: my_max_pages=10, my_n_pages=0 @; // Estimated number of needed pages INTEGER(KIND=i_byte) :: my_free_page=0 @; // Top of stack of free pages ENDTYPE @; @ Each skip list has some counters and pointers associated with it, which are packaged in the data-type |SkipList|. This includes the virtual memory system associated with the skip list, the dummy (sentinel) head and tail nodes, etc. Some of these variables are overhead, but since usually the skip lists will contain many nodes, this is not too bad: @ @m _DeclareSkipList(_key_type) @; _DeclareNodeType(_key_type) @; _DeclareMemoryType(_key_type) @; PUBLIC :: SkipList_@e@&_key_type @; TYPE SkipList_@e@&_key_type @; _TYPE(Node_@e@&_key_type), POINTER :: my_head=_NULL, my_tail=_NULL @; // Sentinel nodes _TYPE(NodePointer_@e@&_key_type), DIMENSION(:), _DYNAMIC :: my_search_finger=_NULL @; _TYPE(MemoryMapOfNodes_@e@&_key_type), POINTER :: my_memory=_NULL @; // Virtual memory INTEGER(KIND=i_byte) :: my_n_levels=0, my_max_levels=0 @; INTEGER(KIND=i_wp) :: my_n_elements=0, my_max_elements=1000 @; LOGICAL(KIND=l_short) :: my_use_search_finger=.TRUE., my_keep_duplicates=.FALSE. , & my_trace_allocation=.TRUE. @; // Indicators ENDTYPE @; @ To make my life (read: typing) easier and to separate the exact organization of the data types from the rest of the algorith, I declare a bunch of macros here whose names start with an underscore: @ @m _key(node) node%my_data%my_key%my_value @%% @m _data(node) node%my_data @%% @m _page(node) node%my_page @%% @m _max_level(node) node%my_max_level @%% @m _next(node) node%my_next @%% @m _memory skip_list%my_memory @%% @m _search_finger skip_list%my_search_finger @%% @m _use_search_finger skip_list%my_use_search_finger @%% @m _keep_duplicates skip_list%my_keep_duplicates @%% @m _trace_allocation skip_list%my_trace_allocation @%% @m _tail skip_list%my_tail @%% @m _head skip_list%my_head @%% @m _max_levels skip_list%my_max_levels @%% @m _n_levels skip_list%my_n_levels @%% @m _max_elements skip_list%my_max_elements @%% @m _n_elements skip_list%my_n_elements @%% @m _p _memory%my_p @%% @m _log_p _memory%my_log_p @%% @m _page_size _memory%my_page_size @%% @m _max_pages _memory%my_max_pages @%% @m _max_lists _memory%my_max_lists @%% @m _n_lists _memory%my_n_lists @%% @m _n_pages _memory%my_n_pages @%% @m _free_node(page) _memory%my_pages[page]%my_free_node%my_node @%% @m _free_page _memory%my_free_page @%% @m _pages _memory%my_pages @%% @m _block(page) _pages[page]%my_block @%% @m _n_nodes(page) _block(page)%my_n_nodes @%% @m _nodes_block(page) _block(page)%my_nodes @%% @m _nexts_block(page) _block(page)%my_pointers @%% @ A further macro layer are the macros that take a finger pointer and associate it or with it some node pointer (for a given layer), and macros that find and associate the next pointer for a node at a given node. All algorithms will be written in terms of these four macros, thus making changing some of the above data types easy: @ @m _FindFinger(search_finger, node, level) @; node=>search_finger[level]%my_node @; @m _AssociateFinger(search_finger, node, level) @; search_finger[level]%my_node=>node @; @m _FindNext(node, next_node, level) @; @%% next_node=>node%my_next[level]%my_node @; next_node=>_nexts_block(_page(node))[_next(node)+level]%my_node @; @m _AssociateNext(node, next_node, level) @; @%% node%my_next[level]%my_node=>next_node @; _nexts_block(_page(node))[_next(node)+level]%my_node=>next_node @; @m _FindLevel(node, level) @; @%% level=SIZE(node%my_next) @; level=1 @; // The actual level is not stored! @ And finally, here is the organization of the |Skip_Lists| module: @ @= MODULE Skip_Lists @; USE Precision @; USE Error_Handling @; USE System_Monitors @; USE Random_Numbers @; USE Key_Types @; IMPLICIT NONE @; PUBLIC :: InitializeSkipList, DestroySkipList, & SearchInSkipList, InsertInSkipList, DeleteFromSkipList, & PrintSkipList @; PRIVATE @; _DeclareSkipList(Key_i_wp) @; _DeclareSkipList(Key_i_wp_r_wp) @; @@; CONTAINS @; @@; @@; @@; @@; @@; @@; @@; END MODULE Skip_Lists @; @ And two more auxilliary macros I will use, along with local variable declarations that will be in common to all the routines in this module: @ @m _GenerateProcedureBody(Procedure, ...) @; _@e##Procedure(Key_i_wp, _CompareSingleKeys, _TestEqualitySingleKeys, #.) @; _@e##Procedure(Key_i_wp_r_wp, _CompareSingleKeys, _TestEqualitySingleKeys, #.) @; @ @m _GenerateGenericInterface(Procedure) @; INTERFACE Procedure @; MODULE PROCEDURE Procedure@e@&_Key_i_wp @; MODULE PROCEDURE Procedure@e@&_Key_i_wp_r_wp @; END INTERFACE @; @m _DeclareAuxilliaryVariables(_key_type) @; _TYPE(Node_@e@&_key_type), POINTER :: node, next_node, search_node @; INTEGER(KIND=i_byte) :: level, node_level @; LOGICAL :: comparison @; @*0 Allocation Menagement for Skip Lists. Each node in the skip list has a random number of pointers to nodes following it in the skip list. The maximum node level needed to manipulate $n$ nodes in logarithmic time is denoted here with $L(n)$. There are several ways to generate a random node level (from a binomial distribution), some better when the node level is large (this involves an expensive logaritm), others better for small levels (several random number generations, but no logarithm). I use the one suitable for large lists, but it is possible to switch between the two: @ @m _L(n, log_p) MAX(INT(-LOG(@E REAL(n, KIND=r_wp))/log_p), 1) @%% @m _RandomLevelLarge(level, log_p) @; CALL RandomUniform(dice) @; level=MAX(1_i_byte, CEILING(LOG(dice)/log_p, KIND=i_byte)) @; @m _RandomLevelSmall(level, p) @; level=1 @; DO @; CALL RandomUniform(dice) @; IF(dice>=p) @~ EXIT @; level=level+1 @; END DO @; @m _GenerateNodeLevel(skip_list, n_levels) @; _RandomLevelLarge(n_levels, _log_p) @; // Generate a random level @*1 Stack-Based Allocation System. The allocator developed here uses a stack-based system to minimize memory overhead and memory fragmentation. Namely, when a node is deleted, it is placed on a stack of free nodes (there is one stack for each page in the virtual memory). Since we already have next pointers, this does not cost us any additional memory. Then when a new node needs to be allocated, the first thing to check is whether there are any previously deallocated nodes on the stack. If not, there may be still untouched nodes in the page, and if not, then a new page may need to be allocated. Here pages that have open spots in them are also placed on a stack, so that a page is filled completely before taken off the stack. This will tend to minimize memory fragmentation when data-locality or locality-of-reference is present, but other schemes, such as keeping a heap of pages sorted on the number of free spots. The macros below are the ones that perform the stack operations. I can not fully explain the system here, but the general principles should be evident from looking at the code and comments: @ @m _PushOnNodeStack(node, stack) @; _AssociateNext(node, stack, 1) @; stack=>node @; @m _PopOffNodeStack(node, stack) @; _FindNext(stack, next_node, 1) @; node=>stack @; stack=>next_node @; @m _PushOnPageStack(page, stack) @; @%% WRITE(*,*) "Pushing page #", page, " onto stack with n_pages=", _n_pages @; _pages[page]%my_next=stack @; stack=page @; @m _PopOffPageStack(page, stack, next_page) @; @%% WRITE(*,*) "Popping page #", stack, " from stack with n_pages=", _n_pages @; next_page=_pages[stack]%my_next @; page=stack @; stack=next_page @; @*2 Node Allocation. The routine that allocates nodes is the most complicated. It has to first look for a page that has empty nodes, and if there is no such page, allocate one. All pages may be full, in which case these need to be reallocated. Emtpy nodes are popped from a stack of empty nodes maintained on each page. Head and tail nodes must be handled separately, because their node levels are not random but rather fixed. So here the sentinel page number 0 is used to store heads and tails and is managed by a separate piece of code. The routine is somewhat clumsy and can be optimized further, but I think the basic ideas are promissing: @ @m _AllocateNode(_key_type, ...) @; SUBROUTINE AllocateNode_@e@&_key_type(skip_list, new_node) @; _TYPE(SkipList_@e@&_key_type), POINTER :: skip_list @; _TYPE(Node_@e@&_key_type), POINTER :: new_node @; _DeclareAuxilliaryVariables(_key_type) @; // We may need some reallocation buffers for this procedure _TYPE(PageOfNodes_@e@&_key_type), DIMENSION(:), ALLOCATABLE :: temp_pages @; _TYPE(Node_@e@&_key_type), DIMENSION(:), ALLOCATABLE :: temp_nodes @; _TYPE(NodePointer_@e@&_key_type), DIMENSION(:), ALLOCATABLE :: temp_pointers @; REAL :: dice @; // For random level generation INTEGER(KIND=i_byte) :: page, temp_page @; INTEGER(KIND=i_wp) :: n_nodes, n_pointers @; // Counters INTEGER :: alloc_status @; LOGICAL :: find_free_node, get_next_node, get_next_page, new_page, reallocate_pages @; // Action selectors--to avoid compex nested |IF| constructs @%% WRITE(*,*) "Allocating new node for list with n_elements=", _n_elements @; HeadAndTail: IF(_n_elements<=1) THEN @; // This is still the head and tail node IF(_n_elements==0) THEN @; _n_lists=_n_lists+1 @; IF(_trace_allocation) @~ WRITE(*,*) "Allocating head for list #", _n_lists @; END IF @; _AllocateHeadAndTail @; RETURN @; END IF HeadAndTail @; find_free_node=.FALSE. @; // Do we need to look for a free node or is there one on the stacks page=_free_page @; IF(page!=0) THEN @; // There are no pages on the stack IF(.NOT.ASSOCIATED(_free_node(page))) @~ find_free_node=.TRUE. @; ELSE @; find_free_node=.TRUE. @; END IF @; FindFreeNode: IF(find_free_node) @; reallocate_pages=.FALSE. @; get_next_node=.FALSE. @; get_next_page=.FALSE. @; new_page=.FALSE. @; FindFreePage: DO @; // Look for a page that has empty nodes IF(page==0) THEN @; // All allocated pages have been filled @%% WRITE(*,*) "Page stack is empty!" @; n_nodes=HUGE(1_i_wp) @; // Search for page with least active nodes CheckAllPages: DO temp_page=1, _n_pages @; CheckForOpenings: IF(_NOT_NULL(_nodes_block(temp_page))) THEN @; // Look if some of the full pages have opened spots IF(_n_nodes(temp_page)<_SIZE(_nodes_block(temp_page), i_wp)) THEN @; // Empty spots IF(_n_nodes(temp_page)=SIZE(_pages)) @~ reallocate_pages=.TRUE. @; // All pages are full EXIT FindFreePage @; END IF @; FullPage: IF(_NOT_NULL(_nodes_block(page))) THEN @; // Check if this page is full IF(_n_nodes(page)==_SIZE(_nodes_block(page), i_wp)) THEN @; // Page is full @%% WRITE(*,*) "Page #", page, "is full and is popped off the stack" @; _PopOffPageStack(page, _free_page, temp_page) @; page=_free_page @; // Take the next free page off the stack CYCLE FindFreePage @; // Look at the next page ELSE @; get_next_node=.TRUE. @; // There are free nodes left over EXIT FindFreePage @; END IF @; ELSE @; // This page has not been allocated yet or it was deallocated previously new_page=.TRUE. @; get_next_node=.TRUE. @; EXIT FindFreePage @; END IF FullPage @; END DO FindFreePage @; IF(reallocate_pages) THEN @; // {\bf Reallocate} |_pages| to have more pages IF(_trace_allocation) THEN @; // Record this event CALL Warning(message="Reallocation of pages in virtual memory in skip list", & caller="AllocateNode") @; END IF @; _ReallocateArray(_pages, temp_pages, 0, _n_pages+1) @; END IF @; IF(get_next_page) THEN @; @%% WRITE(*,*) "Pushing next available page #", _n_pages+1, " onto stack" @; _n_pages=_n_pages+1 @; page=_n_pages @; _PushOnPageStack(page, _free_page) @; END IF @; NewPage: IF(new_page) THEN @; // Allocate a new page of nodes and pointers IF(_trace_allocation) THEN @; WRITE(message_print_unit,*) "Allocating page of nodes and links #", page, & " of size ", _page_size, ", n_pages=", _n_pages,", n_elements=",_n_elements @; END IF @; @%% ALLOCATE(_block(page)) @; // Allocate a new page of nodes ALLOCATE(_nodes_block(page)[_page_size]) @; n_pointers=0 @; CreateNodes: DO n_nodes=1, _page_size @; node=>_nodes_block(page)[n_nodes] @; _GenerateNodeLevel(skip_list, node_level) @; _max_level(node)=node_level @; _page(node)=page @; _next(node)=n_pointers @; n_pointers=n_pointers+node_level @; END DO CreateNodes @; ALLOCATE(_nexts_block(page)[n_pointers]) @; // Allocate fields for the link pointers END IF NewPage @; IF(get_next_node) THEN @; @%% WRITE(*,*) "Taking new node from page ", page, "index ", _n_nodes(page), & @%% " with level ", _max_level(node), " and nexts at ", _next(node) @; node=>_nodes_block(page)[_n_nodes(page)+1] @; _PushOnNodeStack(node, _free_node(page)) @; // The next free node @%% _key(node)=-huge_@e@&_key_type @; // Mark this node as freed for debugging purposes END IF @; END IF FindFreeNode @; @%% WRITE(*,*) "-------> Popping new node off page #", page @; _PopOffNodeStack(new_node, _free_node(page)) @; // Take the next available node _n_nodes(page)=_n_nodes(page)+1 @; END SUBROUTINE @; // |AllocateNode| @ Heads and tails are allocated on page 0 for each skip list that uses the given memory system: @ @m _AllocateHeadAndTail @; IF(.NOT._NOT_NULL(_nodes_block(0))) THEN @; n_nodes=2*_max_lists @; n_pointers=n_nodes*_L(_max_pages*_page_size, _log_p) @; IF(_trace_allocation) THEN @; WRITE(message_print_unit,*) "Allocating page 0 of heads and tails ", & " for ", n_nodes, " sentinel nodes and ", n_pointers, " next pointers" @; END IF @; ALLOCATE(_nodes_block(0)[0:n_nodes]) @; ALLOCATE(_nexts_block(0)[0:n_pointers]) @; node=>_nodes_block(0)[0] @; // Yet another sentinel node _page(node)=0 @; _max_level(node)=0 @; _next(node)=0 @; END IF @; n_nodes=_n_nodes(0) @; node=>_nodes_block(0)[n_nodes] @; n_pointers=_next(node)+_max_level(node) @; IF( (n_nodes+1) > SIZE(_nodes_block(0)) ) THEN @; // Allocate more sentinel nodes IF(_trace_allocation) THEN @; WRITE(message_print_unit,*) "Reallocating nodes block of page 0 of heads and tails!" @; END IF @; _ReallocateArray(_nodes_block(0), temp_nodes, 1, n_nodes+1) @; END IF @; IF( (n_pointers+_max_levels) > SIZE(_nexts_block(0))) THEN @; // More next fields IF(_trace_allocation) THEN @; WRITE(message_print_unit,*) "Reallocating nexts block of page 0 of heads and tails!" @; END IF @; _ReallocateArray(_nexts_block(0), temp_pointers, 1, n_pointers+_max_levels) @; END IF @; _n_nodes(page)=_n_nodes(0)+1 @; // Add this head or tail new_node=>_nodes_block(0)[_n_nodes(page)] @; _page(new_node)=0 @; _max_level(new_node)=_max_levels @; _next(new_node)=n_pointers @; @ For some reason reallocation as given here does not work for reallocating the pages, at least with LF95, but it seems to me that this is a compiler bug, not a programming error. With LF95 just be careful to allocate enough pages each time, until the bug is killed! The following macro reallocates a page, or any array for that matter to a new index range: @ @m _ReallocateArray(array, buffer, new_lb, new_ub) @; ALLOCATE(buffer(new_lb:new_ub)) @; buffer[new_lb:new_lb+SIZE(array)-1]=array @; @%% // {\bf Memory leak:} This {\bf should be a |DEALLOCATE|}, but this crashes LF95! DEALLOCATE(array) @; // Reallocate ALLOCATE(array(new_lb:new_ub)) @; array=buffer @; // Copy the buffer DEALLOCATE(buffer) @; @*2 Node Deallocation. Deallocating nodes is easier--push the empty nodes onto the empty node and check if this was the last used node in the page. If so, the block of nodes in the page is deallocated. At this point we are not sure where in the stack this page is, so we just leave it as it is, although we should really take it off the stack if it is on it (this requires doubly linked lists for $O(1)$ implementations). In this whole scheme, the part I am not yet pleased with is what happens to full pages. At first I tried to have them put at the bottom of the page stack, since later openings might be created. But this creates endless loops of popping off the stack and pushing at the bottom of the stack. The best solution is probably to have a separate stack for either the deallocated pages or the full (at some point in time) pages. I just haven't implemented this yet so the above allocation routine scans through all pages when the page stack is empty. @ @m _DeallocateNode(_key_type, ...) @; SUBROUTINE DeallocateNode_@e@&_key_type(skip_list, free_node) @; _TYPE(SkipList_@e@&_key_type), POINTER :: skip_list @; _TYPE(Node_@e@&_key_type), POINTER :: free_node @; _DeclareAuxilliaryVariables(_key_type) @; INTEGER(KIND=i_byte) :: page, temp_page @; @%% WRITE(*,*) "Deallocating a node from page #", _page(free_node) @; page=_page(free_node) @; @%% _key(free_node)=-huge_@e@&_key_type @; // Mark it as freed node for debugging _PushOnNodeStack(free_node, _free_node(page)) @; _n_nodes(page)=_n_nodes(page)-1 @; IF(_n_nodes(page)==0) THEN @; // Free the page IF(_trace_allocation) THEN @; WRITE(message_print_unit,*) "Deallocating page of nodes and links #", page, & ", n_pages=", _n_pages,", n_elements=",_n_elements @; END IF @; NULLIFY(_free_node(page)) @; DEALLOCATE(_nodes_block(page)) @; DEALLOCATE(_nexts_block(page)) @; @%% DEALLOCATE(_block(page)) @; END IF @; END SUBROUTINE @; // |DeallocateNode| @= _GenerateProcedureBody(AllocateNode) @; _GenerateProcedureBody(DeallocateNode) @; @ @= _GenerateGenericInterface(AllocateNode) @; _GenerateGenericInterface(DeallocateNode) @; @*2 Initialization and Termination. The routines |InitializeSkipList| and |DestroySkipList| are the creator/destructor routines for the skip list data type. |InitializeSkipList| will create a legal skip list from either an empty pointer or a pointer that is already allocated by the caller (this way some of the parameters used by initialization, such as the maximum allowed level in the skip list, can be set by the caller before calling the initialization routine. Please note that no routine should be changes to the list data should be made after the initialization so as to avoid messing something up. I could have boxed the types a couple more levels to make some things private and some public, but care if all that is needed. @ @m _InitializeSkipList(_key_type, ...) @; SUBROUTINE InitializeSkipList_@e@&_key_type(skip_list) @; _TYPE(SkipList_@e@&_key_type), POINTER :: skip_list @; _DeclareAuxilliaryVariables(_key_type) @; INTEGER(KIND=i_byte) :: page @; IF(.NOT.ASSOCIATED(skip_list)) @~ ALLOCATE(skip_list) @; // Create a fresh skip list IF(.NOT.ASSOCIATED(_memory)) THEN @; // Create a virtual memory just for this list ALLOCATE(_memory) @; _page_size=CEILING(@E REAL(_max_elements)/@E REAL(_max_pages)) @; // Estimated page size (plus head and tail) END IF @; IF(.NOT._NOT_NULL(_pages))@~ ALLOCATE(_pages(0:_max_pages)) @; _log_p=LOG(_p) @; IF(_max_levels<=0) @~ _max_levels=_L(_max_elements, _log_p) @; _n_elements=0 @; CALL AllocateNode(skip_list, _head) @; _n_elements=1 @; CALL AllocateNode(skip_list, _tail) @; _n_elements=2 @; _key(_head)=-huge_@e@&_key_type @; // The smallest possible key--{\bf a sentinel} _key(_tail)=huge_@e@&_key_type @; // The largest possible key--{\bf a sentinel} ALLOCATE(_search_finger(_max_levels)) @; DO level=1, _max_levels @; // Point to the sentinel node by default _AssociateNext(_head, _tail, level) @; _AssociateNext(_tail, NULL(), level) @; _AssociateFinger(_search_finger, _head, level) @; END DO @; END SUBROUTINE @; // |InitializeSkipList| @= _GenerateProcedureBody(InitializeSkipList) @; @ @= _GenerateGenericInterface(InitializeSkipList) @; @ The routine |DestroySkipList| will deallocate all memory (note that memory leaks have not been fully investigated yet) associated with a skip list initialized with |InitializeSkipList|. Please note that this will deallocate the skip list pointer even if this was allocated outside. In the current version I explicitly delete all present nodes before a possible memory deallocation. The memory does not have to be deallocated, as indicated by |deallocate_memory|, since other skip lists might share the same virtual memory. @ @m _DestroySkipList(_key_type, ...) @; SUBROUTINE DestroySkipList_@e@&_key_type(skip_list, deallocate_memory) @; _TYPE(SkipList_@e@&_key_type), POINTER :: skip_list @; LOGICAL, INTENT(IN), OPTIONAL :: deallocate_memory @; _DeclareAuxilliaryVariables(_key_type) @; INTEGER :: alloc_status @; LOGICAL :: free_memory @; free_memory=.FALSE. @; IF(PRESENT(deallocate_memory)) @~ free_memory=deallocate_memory @; DEALLOCATE(_search_finger) @; node=>_head @; DeleteNodes: DO @; // Delete all left-over nodes from the list _FindNext(node, next_node, 1) @; CALL DeallocateNode(skip_list, node) @; IF(.NOT.ASSOCIATED(next_node)) @~ EXIT DeleteNodes @; // Reached the end node=>next_node @; END DO DeleteNodes @; IF(free_memory) THEN @; DEALLOCATE(_pages) @; DEALLOCATE(_memory, STAT=alloc_status) @; IF(alloc_status!=0) @~ NULLIFY(_memory) @; END IF @; DEALLOCATE(skip_list, STAT=alloc_status) @; IF(alloc_status!=0) @~ NULLIFY(skip_list) @; END SUBROUTINE @; // |DestroySkipList| @= _GenerateProcedureBody(DestroySkipList) @; @ @= _GenerateGenericInterface(DestroySkipList) @; @*0 Searching, Insertion and Deletion (and Printing) Operations. This section implements the free basic elementary skip list operations. They are all implemented in a similar fashion to that prescribed in the papers by W. Pugh, and are fully coded with the macros defined previously for full ease of maintainance. The central piece of the code is the macro |_SearchSkipList| which searches for a given key and returns a search finger pointing to the forward-most nodes that have a key smaller than the given search node and are of the corresponding level. A search finger is used to start the search if requested via |_use_search_finger|, so that when data locality or locality-of-reference is present the operations are much faster: @ @m _SearchSkipList(ID, _CompareKeys, skip_list, search_key, search_finger) @; IF(_use_search_finger) THEN @; // Start as close as possible to the search finger level=1 @; _FindFinger(search_finger, search_node, level) @; @e#!_CompareKeys(comparison, search_key, _key(search_node)) @; IF(.NOT.comparison) THEN @; // Move search finger as forward as possible PointAhead_@e@&ID: DO @; level=level+1 @; IF(level>_n_levels) @~ EXIT PointAhead_@e@&ID @; _FindFinger(search_finger, node, level) @; _FindNext(node, next_node, level) @; @e#!_CompareKeys(comparison, search_key, _key(next_node)) @; IF(comparison) THEN @; // Jumped too far forward EXIT PointAhead_@e@&ID @; ELSE @; // Save this node search_node=>next_node @; // Jump ahead END IF @; END DO PointAhead_@e@&ID @; ELSE @; // Move back from search finger level=1 @; PointBack_@e@&ID: DO @; level=level+1 @; IF(level>_n_levels) THEN @; // We need to restart the finger to the header search_node=>_head @; EXIT PointBack_@e@&ID @; END IF @; _FindFinger(search_finger, node, level) @; @e#!_CompareKeys(comparison, search_key, _key(node)) @; IF(.NOT.comparison) THEN @; // Found the nearest suitable node further to front search_node=>node @; // Jump ahead EXIT PointBack_@e@&ID @; // Jumped back far enough END IF @; END DO PointBack_@e@&ID @; END IF @; ELSE @; // Start from the beginning of the list level=_n_levels+1 @; search_node=>_head @; END IF @; SearchAhead_@e@&ID: DO @; level=level-1 @; IF(level<1) @~ EXIT SearchAhead_@e@&ID @; _AssociateFinger(search_finger, search_node, level) @; SkipAhead_@e@&ID: DO @; _FindFinger(search_finger, node, level) @; _FindNext(node, next_node, level) @; @e#!_CompareKeys(comparison, search_key, _key(next_node)) @; IF(.NOT.comparison) THEN @; // OK to jump ahead _AssociateFinger(search_finger, next_node, level) @; ELSE @; // Go to next level EXIT SkipAhead_@e@&ID @; END IF @; END DO SkipAhead_@e@&ID @; END DO SearchAhead_@e@&ID @; @m _CompareSingleKeys(comparison, first, second) @; comparison = (first <= second) @; // The equality part is important here! @m _TestEqualitySingleKeys(comparison, first, second) @; comparison = (first == second) @; @*1 Searching. The routine |SearchInSkipList| is basically a wrapper around the search macro and it returns the datum of the node that matched the given key, if the key was actually present in the list, as indicated by the return value |found|: @ @m _SearchInSkipList(_key_type, _CompareKeys, _TestEqualityOfKeys, ...) @; SUBROUTINE SearchInSkipList_@e@&_key_type(skip_list, key, data, found) @; _TYPE(SkipList_@e@&_key_type), POINTER :: skip_list @; _TYPE(_key_type), INTENT(IN) :: key @; _TYPE(Data_@e@&_key_type), INTENT(OUT), OPTIONAL :: data @; LOGICAL, INTENT(OUT), OPTIONAL :: found @; _DeclareAuxilliaryVariables(_key_type) @; @%% WRITE(*,*) "Searching for key: ", key, " in Skip List-->n_elements=",_n_elements @; _SearchSkipList(Search, @e#!_CompareKeys, skip_list, key%my_value, _search_finger) @; _FindFinger(_search_finger, search_node, 1) @; // Find the finger _FindNext(search_node, node, 1) @; // Look at the next node @e#!_TestEqualityOfKeys(comparison, _key(node), key%my_value) @; IF(PRESENT(found)) @~ found=comparison @; IF(PRESENT(data).AND.comparison) @~ data=_data(node) @; // Return the info about the node END SUBROUTINE @; // |SearchInSkipList| @= _GenerateProcedureBody(SearchInSkipList) @; @ @= _GenerateGenericInterface(SearchInSkipList) @; @*1 Insertion. When inserting a node, we simply perform a search to find the location of insertion and then just break some of the next-node links to accomodate the new node. The node is explicitly allocated by |InsertInSkipList| here before insertion (and deallocated upon deletion). Since I do the memory management myself, this is not a problem (the node will most likely not really be deallocated, but just popped onto the empty-node stack): @ @m _InsertInSkipList(_key_type, _CompareKeys, _TestEqualityOfKeys, ...) @; SUBROUTINE InsertInSkipList_@e@&_key_type(skip_list, data) @; _TYPE(SkipList_@e@&_key_type), POINTER :: skip_list @; _TYPE(Data_@e@&_key_type), INTENT(IN) :: data @; _DeclareAuxilliaryVariables(_key_type) @; _TYPE(Node_@e@&_key_type), POINTER :: new_node @; LOGICAL :: duplicate @; CALL SearchInSkipList(skip_list=skip_list, key=data%my_key, found=duplicate) @; IF((.NOT._keep_duplicates).AND.duplicate) @~ RETURN @; // A duplicate key @%% WRITE(*,*) "Inserting node: ", data, " in Skip List-->n_elements=",_n_elements @; CALL AllocateNode(skip_list, new_node) @; _n_elements=_n_elements+1 @; new_node%my_data=data @; node_level=MIN(_max_levels, _max_level(new_node)) @; // Do not cross over |_max_levels| _n_levels=MAX(_n_levels, node_level) @; // The new list level DO level=1, node_level @; _FindFinger(_search_finger, search_node, level) @; // |search_node=>search_finger[level]| _FindNext(search_node, next_node, level) @; // |next_node=>Next(search_finger, level)| _AssociateNext(new_node, next_node, level) @; // |Next(new_node, level)=>next_node| _AssociateNext(search_node, new_node, level) @; // |Next(search_finger, level)=>new_node| END DO @; END SUBROUTINE @; // |InsertInSkipList| @= _GenerateProcedureBody(InsertInSkipList) @; @ @= _GenerateGenericInterface(InsertInSkipList) @; @*1 Deletion. Deletion is just like insertion--search for the key to see if it is present, and then remove the node from the list by bypassing it underneath the pointers that ended or started from it, and finally, deallocating the node. Here I check to see if this was the node of highest level, so as to reduce |_n_levels| and possibly speed operation (this almost never happens in moderatly sized lists): @ @m _DeleteFromSkipList(_key_type, _CompareKeys, _TestEqualityOfKeys, ...) @; SUBROUTINE DeleteFromSkipList_@e@&_key_type(skip_list, key, data, deleted) @; _TYPE(SkipList_@e@&_key_type), POINTER :: skip_list @; _TYPE(_key_type), INTENT(IN) :: key @; _TYPE(Data_@e@&_key_type), INTENT(OUT), OPTIONAL :: data @; LOGICAL, INTENT(OUT), OPTIONAL :: deleted @; _DeclareAuxilliaryVariables(_key_type) @; _TYPE(Node_@e@&_key_type), POINTER :: old_node @; LOGICAL :: found @; /* First perform the standard search for the node: */ IF(.NOT.PRESENT(data)) THEN @; CALL SearchInSkipList(skip_list=skip_list, key=key, found=found) @; ELSE @; CALL SearchInSkipList(skip_list=skip_list, key=key, data=data, found=found) @; END IF @; IF(PRESENT(deleted)) @~ deleted=found @; // If node was found it will be deleted IF(.NOT.found) @~ RETURN @; // Node was not found @%% WRITE(*,*) "Deleting node with key", key @; BypassNode: DO level=1, _n_levels @; _FindFinger(_search_finger, search_node, level) @; _FindNext(search_node, next_node, level) @; IF(level==1) @~ old_node=>next_node @; // This is the node to delete IF(.NOT.ASSOCIATED(old_node, next_node)) @~ EXIT BypassNode @; // We exceeded the level of |node|--no need to store it explicitly _FindNext(old_node, next_node, level) @; _AssociateNext(search_node, next_node, level) @; // Bypass the deleted node END DO BypassNode @; ReduceLevel: DO @; // If the largest-level node was deleted, reduce |n_levels| _FindNext(_head, next_node, _n_levels) @; IF((.NOT.ASSOCIATED(next_node, _tail)).OR.(_n_levels<=1)) @~ EXIT ReduceLevel @; _n_levels=_n_levels-1 @; END DO ReduceLevel @; CALL DeallocateNode(skip_list, old_node) @; // Deallocate the deleted node _n_elements=_n_elements-1 @; END SUBROUTINE @; // |DeleteFromSkipList| @= _GenerateProcedureBody(DeleteFromSkipList) @; @ @= _GenerateGenericInterface(DeleteFromSkipList) @; @*1 Printing. This is just one utility routine for printing the contents of a SkipList. For a descent user-friendly implementation of a linked list, I should really provide a routine for accessingall the keys in a list (so one can see what is in the list), and a modification of |PrintSkipList| could do something like this. I just couldn't decide on the interface of such a "peek in skip list" routine, so I left it out for now. @ @m _key_format "(G10.3)" @%% @ @m _PrintSkipList(_key_type, ...) @; SUBROUTINE PrintSkipList_@e@&_key_type(skip_list) @; _TYPE(SkipList_@e@&_key_type), POINTER :: skip_list @; _DeclareAuxilliaryVariables(_key_type) @; WRITE(*,*) @; _FindNext(_head, node, 1) @; // Do not print the head PrintList: DO @; _FindNext(node, next_node, 1) @; IF(ASSOCIATED(next_node)) THEN @; WRITE(UNIT=*, FMT=_key_format, ADVANCE="NO") _key(node) @; WRITE(UNIT=*, FMT="(A)", ADVANCE="NO") " ---> " _FindLevel(node, node_level) @; DO level=1, node_level @; _FindNext(node, search_node, level) @; WRITE(UNIT=*, FMT=_key_format, ADVANCE="NO") _key(search_node) @; END DO @; WRITE(*,*) @; // New line node=>next_node @; ELSE @; EXIT PrintList @; END IF @; END DO PrintList @; WRITE(*,*) @; END SUBROUTINE @; // |PrintSkipList| @ @= _GenerateProcedureBody(PrintSkipList) @; @ @= _GenerateGenericInterface(PrintSkipList) @; @*0 Test Program. The following is a simple test program which tests the module |Skip_Lists| for integer keys that are partially ordered already (to test performance under the presence of temporal and spatial data locality). It performs insertion, deletion, mixed insertion/deletion stress-test rounds, as well as just pure search, and times these for comparison: @ @= PROGRAM Test_Skip_Lists @; USE Precision @; USE Error_Handling @; USE System_Monitors @; USE Initialization_Termination @; USE Random_Numbers @; USE Sorting_Ranking @; USE Key_Types @; USE Skip_Lists @; IMPLICIT NONE @; INTEGER(KIND=i_wp) :: key @; INTEGER, DIMENSION(:), ALLOCATABLE :: permutation @; TYPE(Data_Key_i_wp) :: data @; TYPE(SkipList_Key_i_wp), POINTER :: skip_list=_NULL @; _DeclareAuxilliaryVariables(Key_i_wp) @; INTEGER :: element, n_elements, min_elements=HUGE(0), max_elements=0, n_reps, reps @; REAL :: variance, disorder, random, elapsed_time @; LOGICAL :: test, found @; CALL StartProgram() @; ALLOCATE(skip_list) @; ALLOCATE(_memory) @; WRITE(*,*) "Enter max_elements, page_size, max_pages, and p" @; READ(*,*) _max_elements, _page_size, _max_pages, _p @; WRITE(*,*) "Enter size variance, key disorder and use_search_finger" READ(*,*) variance, disorder, _use_search_finger @; _max_lists=1 @; // Only 1 list for now _keep_duplicates=.FALSE. @; // No duplicate keys for this timing test _trace_allocation=.FALSE. @; // This is a timing test CALL InitializeSkipList(skip_list) @; CALL RandomUniform(random, range=(/1.0-variance, 1.0+variance/)) @; n_elements=INT(random*@E REAL(_max_elements)) @; // In real life n_elements may be unknown WRITE(*,*) "The maximum size of the list will be:", n_elements @; ALLOCATE(permutation(n_elements)) @; CALL DisorderPermutation(disorder=disorder, permutation=permutation, disorder_distribution='U') @; WRITE(*,*) "Starting insertion of nodes!" @; _TimeCode(CALL Insertion(), 1, 1, elapsed_time) @; @%% CALL Insertion() @; @%% WRITE(*,*) "The level of the list is:", _n_levels, " out of possible ", _max_levels @; WRITE(*,*) "Insertion took:", elapsed_time," seconds." @; WRITE(*,*) "Starting search for disordered keys!" @; _TimeCode(CALL Search(), 2, 1, elapsed_time) @; @%% CALL Search() @; WRITE(*,*) "Search took:", elapsed_time," seconds." @; WRITE(*,*) "Starting allocation stress test of mixed insertion/deletion!" @; _TimeCode(CALL InsertionDeletion(), 3, 1, elapsed_time) @; @%% CALL InsertionDeletion() @; WRITE(*,*) "Mixed insertion/deletion took:", elapsed_time," seconds." @; WRITE(*,*) "Minimum and maximum number of nodes was ", min_elements, max_elements @; WRITE(*,*) "Inserting all nodes back and then deleting them!" @; CALL Insertion() @; // Insert all elements again _TimeCode(CALL Deletion(), 4, 1, elapsed_time) @; @%% CALL Deletion() @; WRITE(*,*) "Deletion took:", elapsed_time," seconds." @; DEALLOCATE(permutation) @; CALL DestroySkipList(skip_list, deallocate_memory=.TRUE.) @; CALL EndProgram() @; CONTAINS @; SUBROUTINE Insertion() @; DO element=1, n_elements @; data%my_key%my_value=permutation[element] @; CALL InsertInSkipList(skip_list, data) @; END DO @; END SUBROUTINE Insertion @; SUBROUTINE Deletion() @; DO element=1, n_elements @; CALL DeleteFromSkipList(skip_list=skip_list, key=Key_i_wp(permutation[element]), & data=data, deleted=found) @; END DO @; END SUBROUTINE Deletion @; SUBROUTINE InsertionDeletion() @; DO element=1, n_elements @; CALL RandomUniform(key, range=(/1,n_elements/)) @; key=permutation[key] @; // A random choice @%% WRITE(*,*) "Trying to delete key: ", key @; CALL DeleteFromSkipList(skip_list=skip_list, key=Key_i_wp(key), deleted=found) @; IF(.NOT.found) THEN @; data%my_key=Key_i_wp(key) @; @%% WRITE(*,*) "Inserting key: ", key @; CALL InsertInSkipList(skip_list=skip_list, data=data) @; END IF @; min_elements=MIN(min_elements, _n_elements) @; max_elements=MAX(max_elements, _n_elements) @; @%% WRITE(*,*) element, _n_elements @; END DO @; END SUBROUTINE InsertionDeletion @; SUBROUTINE Search() @; DO element=1, n_elements @; CALL SearchInSkipList(skip_list=skip_list, key=Key_i_wp(permutation[element]), & data=data, found=found) @; END DO @; END SUBROUTINE Search @; END PROGRAM Test_Skip_Lists @; @%%