Panda3D
vertexDataPage.cxx
Go to the documentation of this file.
1 /**
2  * PANDA 3D SOFTWARE
3  * Copyright (c) Carnegie Mellon University. All rights reserved.
4  *
5  * All use of this software is subject to the terms of the revised BSD
6  * license. You should have received a copy of this license along
7  * with this source code in a file named "LICENSE."
8  *
9  * @file vertexDataPage.cxx
10  * @author drose
11  * @date 2007-06-04
12  */
13 
14 #include "vertexDataPage.h"
15 #include "configVariableInt.h"
16 #include "vertexDataSaveFile.h"
17 #include "vertexDataBook.h"
18 #include "vertexDataBlock.h"
19 #include "pStatTimer.h"
20 #include "memoryHook.h"
21 #include "config_gobj.h"
22 #include <algorithm>
23 
24 #ifdef HAVE_ZLIB
25 #include <zlib.h>
26 #endif
27 
28 ConfigVariableInt max_resident_vertex_data
29 ("max-resident-vertex-data", -1,
30  PRC_DESC("Specifies the maximum number of bytes of all vertex data "
31  "that is allowed to remain resident in system RAM at one time. "
32  "If more than this number of bytes of vertices are created, "
33  "the least-recently-used ones will be temporarily compressed in "
34  "system RAM until they are needed. Set it to -1 for no limit."));
35 
36 ConfigVariableInt max_compressed_vertex_data
37 ("max-compressed-vertex-data", 0,
38  PRC_DESC("Specifies the maximum number of bytes of all vertex data "
39  "that is allowed to remain compressed in system RAM at one time. "
40  "If more than this number of bytes of vertices are created, "
41  "the least-recently-used ones will be temporarily flushed to "
42  "disk until they are needed. Set it to -1 for no limit."));
43 
44 ConfigVariableInt vertex_data_compression_level
45 ("vertex-data-compression-level", 1,
46  PRC_DESC("Specifies the zlib compression level to use when compressing "
47  "vertex data. The number should be in the range 1 to 9, where "
48  "larger values are slower but give better compression."));
49 
50 ConfigVariableInt max_disk_vertex_data
51 ("max-disk-vertex-data", -1,
52  PRC_DESC("Specifies the maximum number of bytes of vertex data "
53  "that is allowed to be written to disk. Set it to -1 for no "
54  "limit."));
55 
56 PT(VertexDataPage::PageThreadManager) VertexDataPage::_thread_mgr;
57 
58 // This is a reference to an allocated Mutex, instead of just a static Mutex,
59 // to protect against ordering issues when the application shuts down.
60 Mutex &VertexDataPage::_tlock = *(new Mutex("VertexDataPage::_tlock"));
61 
62 SimpleLru VertexDataPage::_resident_lru("resident", max_resident_vertex_data);
63 SimpleLru VertexDataPage::_compressed_lru("compressed", max_compressed_vertex_data);
64 SimpleLru VertexDataPage::_disk_lru("disk", 0);
65 SimpleLru VertexDataPage::_pending_lru("pending", 0);
66 
67 SimpleLru *VertexDataPage::_global_lru[RC_end_of_list] = {
68  &VertexDataPage::_resident_lru,
69  &VertexDataPage::_compressed_lru,
70  &VertexDataPage::_disk_lru,
71 };
72 
73 VertexDataSaveFile *VertexDataPage::_save_file;
74 
75 // This mutex is (mostly) unused. We just need a Mutex to pass to the Book
76 // Constructor, below.
77 Mutex VertexDataPage::_unused_mutex;
78 
79 PStatCollector VertexDataPage::_vdata_compress_pcollector("*:Vertex Data:Compress");
80 PStatCollector VertexDataPage::_vdata_decompress_pcollector("*:Vertex Data:Decompress");
81 PStatCollector VertexDataPage::_vdata_save_pcollector("*:Vertex Data:Save");
82 PStatCollector VertexDataPage::_vdata_restore_pcollector("*:Vertex Data:Restore");
83 PStatCollector VertexDataPage::_thread_wait_pcollector("Wait:Idle");
84 PStatCollector VertexDataPage::_alloc_pages_pcollector("System memory:MMap:Vertex data");
85 
86 TypeHandle VertexDataPage::_type_handle;
87 TypeHandle VertexDataPage::DeflatePage::_type_handle;
88 
89 #if defined(HAVE_ZLIB) && !defined(USE_MEMORY_NOWRAPPERS)
90 // Define functions that hook zlib into panda's memory allocation system.
91 static void *
92 do_zlib_alloc(voidpf opaque, uInt items, uInt size) {
93  return PANDA_MALLOC_ARRAY(items * size);
94 }
95 static void
96 do_zlib_free(voidpf opaque, voidpf address) {
97  PANDA_FREE_ARRAY(address);
98 }
99 #endif // HAVE_ZLIB && !USE_MEMORY_NOWRAPPERS
100 
101 
102 /**
103  * This constructor is used only by VertexDataBook, to create a mostly-empty
104  * object that can be used to search for a particular page size in the set.
105  */
106 VertexDataPage::
107 VertexDataPage(size_t book_size) :
108  SimpleAllocator(book_size, _unused_mutex),
109  SimpleLruPage(book_size),
110  _book_size(book_size),
111  _block_size(0),
112  _book(nullptr)
113 {
114  _page_data = nullptr;
115  _size = 0;
116  _uncompressed_size = 0;
117  _ram_class = RC_resident;
118  _pending_ram_class = RC_resident;
119 }
120 
121 /**
122  *
123  */
124 VertexDataPage::
125 VertexDataPage(VertexDataBook *book, size_t page_size, size_t block_size) :
126  SimpleAllocator(page_size, book->_lock),
127  SimpleLruPage(page_size),
128  _book_size(page_size),
129  _block_size(block_size),
130  _book(book)
131 {
132  _allocated_size = round_up(page_size);
133  _page_data = alloc_page_data(_allocated_size);
134  _size = page_size;
135 
136  _uncompressed_size = _size;
137  _pending_ram_class = RC_resident;
138  set_ram_class(RC_resident);
139 }
140 
141 /**
142  *
143  */
144 VertexDataPage::
145 ~VertexDataPage() {
146 
147  // Since the only way to delete a page is via the changed_contiguous()
148  // method, the lock will already be held. MutexHolder holder(_lock);
149 
150  {
151  MutexHolder holder2(_tlock);
152  if (_pending_ram_class != _ram_class) {
153  nassertv(_thread_mgr != nullptr);
154  _thread_mgr->remove_page(this);
155  }
156  }
157 
158  if (_page_data != nullptr) {
159  free_page_data(_page_data, _allocated_size);
160  _size = 0;
161  }
162 
163  nassertv(_book == nullptr);
164 }
165 
166 /**
167  * Call this to stop the paging threads, if they were started. This may block
168  * until all of the pending tasks have been completed.
169  */
170 void VertexDataPage::
172  PT(PageThreadManager) thread_mgr;
173  {
174  MutexHolder holder(_tlock);
175  thread_mgr = _thread_mgr;
176  _thread_mgr.clear();
177  }
178 
179  if (thread_mgr != nullptr) {
180  gobj_cat.info()
181  << "Stopping vertex paging threads.\n";
182  thread_mgr->stop_threads();
183  }
184 }
185 
186 /**
187  * Waits for all of the pending thread tasks to finish before returning.
188  */
189 void VertexDataPage::
191  int num_threads = vertex_data_page_threads;
192  if (num_threads == 0) {
193  stop_threads();
194  return;
195  }
196 
197  PT(PageThreadManager) thread_mgr;
198  {
199  MutexHolder holder(_tlock);
200  thread_mgr = _thread_mgr;
201  }
202 
203  if (thread_mgr != nullptr) {
204  thread_mgr->stop_threads();
205  MutexHolder holder(_tlock);
206  thread_mgr->start_threads(num_threads);
207  }
208 }
209 
210 /**
211  *
212  */
213 void VertexDataPage::
214 output(std::ostream &out) const {
215  SimpleAllocator::output(out);
216 }
217 
218 /**
219  *
220  */
221 void VertexDataPage::
222 write(std::ostream &out, int indent_level) const {
223  SimpleAllocator::write(out);
224 }
225 
226 /**
227  * Creates a new SimpleAllocatorBlock object. Override this function to
228  * specialize the block type returned.
229  */
230 SimpleAllocatorBlock *VertexDataPage::
231 make_block(size_t start, size_t size) {
232  return new VertexDataBlock(this, start, size);
233 }
234 
235 /**
236  * This callback function is made whenever the estimate of contiguous
237  * available space changes, either through an alloc or free. The lock will be
238  * held.
239  */
240 void VertexDataPage::
241 changed_contiguous() {
242  if (do_is_empty()) {
243  // If the page is now empty, delete it.
244  VertexDataBook::Pages::iterator pi = _book->_pages.find(this);
245  nassertv(pi != _book->_pages.end());
246  _book->_pages.erase(pi);
247  _book = nullptr;
248  delete this;
249  return;
250  }
251 
252  adjust_book_size();
253 }
254 
255 /**
256  * Evicts the page from the LRU. Called internally when the LRU determines
257  * that it is full. May also be called externally when necessary to
258  * explicitly evict the page.
259  *
260  * It is legal for this method to either evict the page as requested, do
261  * nothing (in which case the eviction will be requested again at the next
262  * epoch), or requeue itself on the tail of the queue (in which case the
263  * eviction will be requested again much later).
264  */
265 void VertexDataPage::
266 evict_lru() {
267  MutexHolder holder(_lock);
268 
269  switch (_ram_class) {
270  case RC_resident:
271  if (_compressed_lru.get_max_size() == 0) {
272  request_ram_class(RC_disk);
273  } else {
274  request_ram_class(RC_compressed);
275  }
276  break;
277 
278  case RC_compressed:
279  request_ram_class(RC_disk);
280  break;
281 
282  case RC_disk:
283  case RC_end_of_list:
284  gobj_cat.warning()
285  << "Internal error: attempt to evict array data " << this
286  << " in inappropriate state " << _ram_class << ".\n";
287  break;
288  }
289 }
290 
291 /**
292  * Allocates a new block. Returns NULL if a block of the requested size
293  * cannot be allocated.
294  *
295  * To free the allocated block, call block->free(), or simply delete the block
296  * pointer.
297  *
298  * Assumes the lock is already held.
299  */
300 VertexDataBlock *VertexDataPage::
301 do_alloc(size_t size) {
302  VertexDataBlock *block = (VertexDataBlock *)SimpleAllocator::do_alloc(size);
303 
304  if (block != nullptr && _ram_class != RC_disk) {
305  // When we allocate a new block within a resident page, we have to clear
306  // the disk cache (since we have just invalidated it).
307  _saved_block.clear();
308  }
309 
310  return block;
311 }
312 
313 /**
314  * Short-circuits the thread and forces the page into resident status
315  * immediately.
316  *
317  * Intended to be called from the main thread. Assumes the lock is already
318  * held.
319  */
320 void VertexDataPage::
321 make_resident_now() {
322  MutexHolder holder(_tlock);
323  if (_pending_ram_class != _ram_class) {
324  nassertv(_thread_mgr != nullptr);
325  _thread_mgr->remove_page(this);
326  }
327 
328  make_resident();
329  _pending_ram_class = RC_resident;
330 }
331 
332 /**
333  * Moves the page to fully resident status by expanding it or reading it from
334  * disk as necessary.
335  *
336  * Intended to be called from the sub-thread. Assumes the lock is already
337  * held.
338  */
339 void VertexDataPage::
340 make_resident() {
341  if (_ram_class == RC_resident) {
342  mark_used_lru();
343  return;
344  }
345 
346  if (_ram_class == RC_disk) {
347  do_restore_from_disk();
348  }
349 
350  if (_ram_class == RC_compressed) {
351 #ifdef HAVE_ZLIB
352  PStatTimer timer(_vdata_decompress_pcollector);
353 
354  if (gobj_cat.is_debug()) {
355  gobj_cat.debug()
356  << "Expanding page from " << _size
357  << " to " << _uncompressed_size << "\n";
358  }
359  size_t new_allocated_size = round_up(_uncompressed_size);
360  unsigned char *new_data = alloc_page_data(new_allocated_size);
361  unsigned char *end_data = new_data + new_allocated_size;
362 
363  z_stream z_source;
364 #ifdef USE_MEMORY_NOWRAPPERS
365  z_source.zalloc = Z_NULL;
366  z_source.zfree = Z_NULL;
367 #else
368  z_source.zalloc = (alloc_func)&do_zlib_alloc;
369  z_source.zfree = (free_func)&do_zlib_free;
370 #endif
371 
372  z_source.opaque = Z_NULL;
373  z_source.msg = (char *) "no error message";
374 
375  z_source.next_in = (Bytef *)(char *)_page_data;
376  z_source.avail_in = _size;
377  z_source.next_out = (Bytef *)new_data;
378  z_source.avail_out = new_allocated_size;
379 
380  int result = inflateInit(&z_source);
381  if (result < 0) {
382  nassert_raise("zlib error");
383  return;
384  }
386 
387  size_t output_size = 0;
388 
389  int flush = 0;
390  result = 0;
391  while (result != Z_STREAM_END) {
392  unsigned char *start_out = (unsigned char *)z_source.next_out;
393  nassertv(start_out < end_data);
394  z_source.avail_out = std::min((size_t)(end_data - start_out), (size_t)inflate_page_size);
395  nassertv(z_source.avail_out != 0);
396  result = inflate(&z_source, flush);
397  if (result < 0 && result != Z_BUF_ERROR) {
398  nassert_raise("zlib error");
399  return;
400  }
401  size_t bytes_produced = (size_t)((unsigned char *)z_source.next_out - start_out);
402  output_size += bytes_produced;
403  if (bytes_produced == 0) {
404  // If we ever produce no bytes, then start flushing the output.
405  flush = Z_FINISH;
406  }
407 
409  }
410  nassertv(z_source.avail_in == 0);
411  nassertv(output_size == _uncompressed_size);
412 
413  result = inflateEnd(&z_source);
414  nassertv(result == Z_OK);
415 
416  free_page_data(_page_data, _allocated_size);
417  _page_data = new_data;
418  _size = _uncompressed_size;
419  _allocated_size = new_allocated_size;
420 #endif
421 
422  set_lru_size(_size);
423  set_ram_class(RC_resident);
424  }
425 }
426 
427 /**
428  * Moves the page to compressed status by compressing it or reading it from
429  * disk as necessary.
430  *
431  * Assumes the lock is already held.
432  */
433 void VertexDataPage::
434 make_compressed() {
435  if (_ram_class == RC_compressed) {
436  // If we're already compressed, just mark the page recently used.
437  mark_used_lru();
438  return;
439  }
440 
441  if (_ram_class == RC_disk) {
442  do_restore_from_disk();
443  }
444 
445  if (_ram_class == RC_resident) {
446  nassertv(_size == _uncompressed_size);
447 
448 #ifdef HAVE_ZLIB
449  PStatTimer timer(_vdata_compress_pcollector);
450 
451  DeflatePage *page = new DeflatePage;
452  DeflatePage *head = page;
453 
454  z_stream z_dest;
455 #ifdef USE_MEMORY_NOWRAPPERS
456  z_dest.zalloc = Z_NULL;
457  z_dest.zfree = Z_NULL;
458 #else
459  z_dest.zalloc = (alloc_func)&do_zlib_alloc;
460  z_dest.zfree = (free_func)&do_zlib_free;
461 #endif
462 
463  z_dest.opaque = Z_NULL;
464  z_dest.msg = (char *) "no error message";
465 
466  int result = deflateInit(&z_dest, vertex_data_compression_level);
467  if (result < 0) {
468  nassert_raise("zlib error");
469  return;
470  }
472 
473  z_dest.next_in = (Bytef *)(char *)_page_data;
474  z_dest.avail_in = _uncompressed_size;
475  size_t output_size = 0;
476 
477  // Compress the data into one or more individual pages. We have to
478  // compress it page-at-a-time, since we're not really sure how big the
479  // result will be (so we can't easily pre-allocate a buffer).
480  int flush = 0;
481  result = 0;
482  while (result != Z_STREAM_END) {
483  unsigned char *start_out = (page->_buffer + page->_used_size);
484  z_dest.next_out = (Bytef *)start_out;
485  z_dest.avail_out = (size_t)deflate_page_size - page->_used_size;
486  if (z_dest.avail_out == 0) {
487  DeflatePage *new_page = new DeflatePage;
488  page->_next = new_page;
489  page = new_page;
490  start_out = page->_buffer;
491  z_dest.next_out = (Bytef *)start_out;
492  z_dest.avail_out = deflate_page_size;
493  }
494 
495  result = deflate(&z_dest, flush);
496  if (result < 0 && result != Z_BUF_ERROR) {
497  nassert_raise("zlib error");
498  return;
499  }
500  size_t bytes_produced = (size_t)((unsigned char *)z_dest.next_out - start_out);
501  page->_used_size += bytes_produced;
502  nassertv(page->_used_size <= deflate_page_size);
503  output_size += bytes_produced;
504  if (bytes_produced == 0) {
505  // If we ever produce no bytes, then start flushing the output.
506  flush = Z_FINISH;
507  }
508 
510  }
511  nassertv(z_dest.avail_in == 0);
512 
513  result = deflateEnd(&z_dest);
514  nassertv(result == Z_OK);
515 
516  // Now we know how big the result will be. Allocate a buffer, and copy
517  // the data from the various pages.
518 
519  size_t new_allocated_size = round_up(output_size);
520  unsigned char *new_data = alloc_page_data(new_allocated_size);
521 
522  size_t copied_size = 0;
523  unsigned char *p = new_data;
524  page = head;
525  while (page != nullptr) {
526  memcpy(p, page->_buffer, page->_used_size);
527  copied_size += page->_used_size;
528  p += page->_used_size;
529  DeflatePage *next = page->_next;
530  delete page;
531  page = next;
532  }
533  nassertv(copied_size == output_size);
534 
535  // Now free the original, uncompressed data, and put this new compressed
536  // buffer in its place.
537  free_page_data(_page_data, _allocated_size);
538  _page_data = new_data;
539  _size = output_size;
540  _allocated_size = new_allocated_size;
541 
542  if (gobj_cat.is_debug()) {
543  gobj_cat.debug()
544  << "Compressed " << *this << " from " << _uncompressed_size
545  << " to " << _size << "\n";
546  }
547 #endif
548  set_lru_size(_size);
549  set_ram_class(RC_compressed);
550  }
551 }
552 
553 /**
554  * Moves the page to disk status by writing it to disk as necessary.
555  *
556  * Assumes the lock is already held.
557  */
558 void VertexDataPage::
559 make_disk() {
560  if (_ram_class == RC_disk) {
561  // If we're already on disk, just mark the page recently used.
562  mark_used_lru();
563  return;
564  }
565 
566  if (_ram_class == RC_resident || _ram_class == RC_compressed) {
567  if (!do_save_to_disk()) {
568  // Can't save it to disk for some reason.
569  gobj_cat.warning()
570  << "Couldn't save page " << this << " to disk.\n";
571  mark_used_lru();
572  return;
573  }
574 
575  free_page_data(_page_data, _allocated_size);
576  _page_data = nullptr;
577  _size = 0;
578 
579  set_ram_class(RC_disk);
580  }
581 }
582 
583 /**
584  * Writes the page to disk, but does not evict it from memory or affect its
585  * LRU status. If it gets evicted later without having been modified, it will
586  * not need to write itself to disk again.
587  *
588  * Returns true on success, false on failure. Assumes the lock is already
589  * held.
590  */
591 bool VertexDataPage::
592 do_save_to_disk() {
593  if (_ram_class == RC_resident || _ram_class == RC_compressed) {
594  PStatTimer timer(_vdata_save_pcollector);
595 
596  if (_saved_block == nullptr) {
597  if (gobj_cat.is_debug()) {
598  gobj_cat.debug()
599  << "Storing page, " << _size << " bytes, to disk\n";
600  }
601 
602  bool compressed = (_ram_class == RC_compressed);
603 
604  _saved_block = get_save_file()->write_data(_page_data, _allocated_size, compressed);
605  if (_saved_block == nullptr) {
606  // Can't write it to disk. Too bad.
607  return false;
608  }
609  } else {
610  if (gobj_cat.is_debug()) {
611  gobj_cat.debug()
612  << "Page already stored: " << _size << " bytes\n";
613  }
614  }
615  }
616 
617  return true;
618 }
619 
620 /**
621  * Restores the page from disk and makes it either compressed or resident
622  * (according to whether it was stored compressed on disk).
623  *
624  * Assumes the lock is already held.
625  */
626 void VertexDataPage::
627 do_restore_from_disk() {
628  if (_ram_class == RC_disk) {
629  nassertv(_saved_block != nullptr);
630  nassertv(_page_data == nullptr && _size == 0);
631 
632  PStatTimer timer(_vdata_restore_pcollector);
633 
634  size_t buffer_size = _saved_block->get_size();
635  if (gobj_cat.is_debug()) {
636  gobj_cat.debug()
637  << "Restoring page, " << buffer_size << " bytes, from disk\n";
638  }
639 
640  size_t new_allocated_size = round_up(buffer_size);
641  unsigned char *new_data = alloc_page_data(new_allocated_size);
642  if (!get_save_file()->read_data(new_data, new_allocated_size, _saved_block)) {
643  nassert_raise("read error");
644  }
645 
646  nassertv(_page_data == nullptr);
647  _page_data = new_data;
648  _size = buffer_size;
649  _allocated_size = new_allocated_size;
650 
651  set_lru_size(_size);
652  if (_saved_block->get_compressed()) {
653  set_ram_class(RC_compressed);
654  } else {
655  set_ram_class(RC_resident);
656  }
657  }
658 }
659 
660 /**
661  * Called when the "book size"--the size of the page as recorded in its book's
662  * table--has changed for some reason. Assumes the lock is held.
663  */
664 void VertexDataPage::
665 adjust_book_size() {
666  size_t new_size = _contiguous;
667  if (_ram_class != RC_resident) {
668  // Let's not attempt to allocate new buffers from non-resident pages.
669  new_size = 0;
670  }
671 
672  if (_book != nullptr && new_size != _book_size) {
673  VertexDataBook::Pages::iterator pi = _book->_pages.find(this);
674  nassertv(pi != _book->_pages.end());
675  _book->_pages.erase(pi);
676 
677  _book_size = new_size;
678  bool inserted = _book->_pages.insert(this).second;
679  nassertv(inserted);
680  }
681 }
682 
683 /**
684  * Requests the thread set the page to the indicated ram class (if we are
685  * using threading). The page will be enqueued in the thread, which will
686  * eventually be responsible for setting the requested ram class.
687  *
688  * Assumes the page's lock is already held.
689  */
690 void VertexDataPage::
691 request_ram_class(RamClass ram_class) {
692  int num_threads = vertex_data_page_threads;
693  if (num_threads == 0 || !Thread::is_threading_supported()) {
694  // No threads. Do it immediately.
695  switch (ram_class) {
696  case RC_resident:
697  make_resident();
698  break;
699 
700  case RC_compressed:
701  make_compressed();
702  break;
703 
704  case RC_disk:
705  make_disk();
706  break;
707 
708  case RC_end_of_list:
709  break;
710  }
711  _pending_ram_class = ram_class;
712  return;
713  }
714 
715  MutexHolder holder(_tlock);
716  if (_thread_mgr == nullptr) {
717  // Create the thread manager.
718  gobj_cat.info()
719  << "Spawning " << num_threads << " vertex paging threads.\n";
720  _thread_mgr = new PageThreadManager(num_threads);
721  }
722 
723  _thread_mgr->add_page(this, ram_class);
724 }
725 
726 /**
727  * Creates the global VertexDataSaveFile that will be used to save vertex data
728  * buffers to disk when necessary.
729  */
730 void VertexDataPage::
731 make_save_file() {
732  size_t max_size = (size_t)max_disk_vertex_data;
733 
734  _save_file = new VertexDataSaveFile(vertex_save_file_directory,
735  vertex_save_file_prefix, max_size);
736 }
737 
738 /**
739  * Allocates and returns a freshly-allocated buffer of at least the indicated
740  * size for holding vertex data.
741  */
742 unsigned char *VertexDataPage::
743 alloc_page_data(size_t page_size) const {
744  _alloc_pages_pcollector.add_level_now(page_size);
745  return (unsigned char *)memory_hook->mmap_alloc(page_size, false);
746 }
747 
748 /**
749  * Releases a buffer allocated via alloc_page_data().
750  */
751 void VertexDataPage::
752 free_page_data(unsigned char *page_data, size_t page_size) const {
753  _alloc_pages_pcollector.sub_level_now(page_size);
754  memory_hook->mmap_free(page_data, page_size);
755 }
756 
757 /**
758  * Assumes _tlock is held.
759  */
760 VertexDataPage::PageThreadManager::
761 PageThreadManager(int num_threads) :
762  _shutdown(false),
763  _pending_cvar(_tlock)
764 {
765  start_threads(num_threads);
766 }
767 
768 /**
769  * Enqueues the indicated page on the thread queue to convert it to the
770  * specified ram class.
771  *
772  * It is assumed the page's lock is already held, and that _tlock is already
773  * held.
774  */
775 void VertexDataPage::PageThreadManager::
776 add_page(VertexDataPage *page, RamClass ram_class) {
777  nassertv(!_shutdown);
778 
779  if (page->_pending_ram_class == ram_class) {
780  // It's already queued.
781  nassertv(page->get_lru() == &_pending_lru);
782  return;
783  }
784 
785  if (page->_pending_ram_class != page->_ram_class) {
786  // It's already queued, but for a different ram class. Dequeue it so we
787  // can requeue it.
788  remove_page(page);
789  }
790 
791  if (page->_pending_ram_class != ram_class) {
792  // First, move the page to the "pending" LRU. When it eventually gets its
793  // requested ram class set, it will be requeued on the appropriate live
794  // LRU.
795  page->mark_used_lru(&_pending_lru);
796 
797  page->_pending_ram_class = ram_class;
798  if (ram_class == RC_resident) {
799  _pending_reads.push_back(page);
800  } else {
801  _pending_writes.push_back(page);
802  }
803  _pending_cvar.notify();
804  }
805 }
806 
807 /**
808  * Dequeues the indicated page and removes it from the pending task list.
809  *
810  * It is assumed the page's lock is already held, and that _tlock is already
811  * held.
812  */
813 void VertexDataPage::PageThreadManager::
814 remove_page(VertexDataPage *page) {
815  nassertv(page != nullptr);
816 
817  PageThreads::iterator ti;
818  for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
819  PageThread *thread = (*ti);
820  if (page == thread->_working_page) {
821  // Oops, this thread is currently working on this one. We'll have to
822  // wait for the thread to finish.
823  page->_lock.release();
824  while (page == thread->_working_page) {
825  thread->_working_cvar.wait();
826  }
827  page->_lock.acquire();
828  return;
829  }
830  }
831 
832  if (page->_pending_ram_class == RC_resident) {
833  PendingPages::iterator pi =
834  find(_pending_reads.begin(), _pending_reads.end(), page);
835  nassertv(pi != _pending_reads.end());
836  _pending_reads.erase(pi);
837  } else {
838  PendingPages::iterator pi =
839  find(_pending_writes.begin(), _pending_writes.end(), page);
840  nassertv(pi != _pending_writes.end());
841  _pending_writes.erase(pi);
842  }
843 
844  page->_pending_ram_class = page->_ram_class;
845 
846  // Put the page back on its proper LRU.
847  page->mark_used_lru(_global_lru[page->_ram_class]);
848 }
849 
850 /**
851  * Returns the number of threads active on the thread manager. Assumes _tlock
852  * is held.
853  */
854 int VertexDataPage::PageThreadManager::
855 get_num_threads() const {
856  return (int)_threads.size();
857 }
858 
859 /**
860  * Returns the number of read requests waiting on the queue. Assumes _tlock
861  * is held.
862  */
863 int VertexDataPage::PageThreadManager::
864 get_num_pending_reads() const {
865  return (int)_pending_reads.size();
866 }
867 
868 /**
869  * Returns the number of write requests waiting on the queue. Assumes _tlock
870  * is held.
871  */
872 int VertexDataPage::PageThreadManager::
873 get_num_pending_writes() const {
874  return (int)_pending_writes.size();
875 }
876 
877 /**
878  * Adds the indicated of threads to the list of active threads. Assumes
879  * _tlock is held.
880  */
881 void VertexDataPage::PageThreadManager::
882 start_threads(int num_threads) {
883  _shutdown = false;
884 
885  _threads.reserve(num_threads);
886  for (int i = 0; i < num_threads; ++i) {
887  std::ostringstream name_strm;
888  name_strm << "VertexDataPage" << _threads.size();
889  PT(PageThread) thread = new PageThread(this, name_strm.str());
890  thread->start(TP_low, true);
891  _threads.push_back(thread);
892  }
893 }
894 
895 /**
896  * Signals all the threads to stop and waits for them. Does not return until
897  * the threads have finished. Assumes _tlock is *not* held.
898  */
899 void VertexDataPage::PageThreadManager::
900 stop_threads() {
901  PageThreads threads;
902  {
903  MutexHolder holder(_tlock);
904  _shutdown = true;
905  _pending_cvar.notify_all();
906  threads.swap(_threads);
907  }
908 
909  PageThreads::iterator ti;
910  for (ti = threads.begin(); ti != threads.end(); ++ti) {
911  PageThread *thread = (*ti);
912  thread->join();
913  }
914 
915  nassertv(_pending_reads.empty() && _pending_writes.empty());
916 }
917 
918 /**
919  *
920  */
921 VertexDataPage::PageThread::
922 PageThread(PageThreadManager *manager, const std::string &name) :
923  Thread(name, name),
924  _manager(manager),
925  _working_cvar(_tlock)
926 {
927 }
928 
929 /**
930  * The main processing loop for each sub-thread.
931  */
932 void VertexDataPage::PageThread::
933 thread_main() {
934  _tlock.acquire();
935 
936  while (true) {
937  PStatClient::thread_tick(get_sync_name());
938 
939  while (_manager->_pending_reads.empty() &&
940  _manager->_pending_writes.empty()) {
941  if (_manager->_shutdown) {
942  _tlock.release();
943  return;
944  }
945  PStatTimer timer(_thread_wait_pcollector);
946  _manager->_pending_cvar.wait();
947  }
948 
949  // Reads always have priority.
950  if (!_manager->_pending_reads.empty()) {
951  _working_page = _manager->_pending_reads.front();
952  _manager->_pending_reads.pop_front();
953  } else {
954  _working_page = _manager->_pending_writes.front();
955  _manager->_pending_writes.pop_front();
956  }
957 
958  RamClass ram_class = _working_page->_pending_ram_class;
959  _tlock.release();
960 
961  {
962  MutexHolder holder(_working_page->_lock);
963  switch (ram_class) {
964  case RC_resident:
965  _working_page->make_resident();
966  break;
967 
968  case RC_compressed:
969  _working_page->make_compressed();
970  break;
971 
972  case RC_disk:
973  _working_page->make_disk();
974  break;
975 
976  case RC_end_of_list:
977  break;
978  }
979  }
980 
981  _tlock.acquire();
982 
983  _working_page = nullptr;
984  _working_cvar.notify();
985 
987  }
988 }
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
An implementation of a very simple LRU algorithm.
Definition: simpleLru.h:28
A block of bytes that holds one or more VertexDataBlocks.
A temporary file to hold the vertex data that has been evicted from memory and written to disk.
An implementation of a very simple block allocator.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
void mark_used_lru() const
To be called when the page is used; this will move it to the tail of the SimpleLru queue it is alread...
Definition: simpleLru.I:152
get_save_file
Returns the global VertexDataSaveFile that will be used to save vertex data buffers to disk when nece...
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
A standard mutex, or mutual exclusion lock.
Definition: pmutex.h:38
A lightweight class that can be used to automatically start and stop a PStatCollector around a sectio...
Definition: pStatTimer.h:30
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition: mutexHolder.h:25
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
Definition: thread.I:212
void set_lru_size(size_t lru_size)
Specifies the size of this page, presumably in bytes, although any unit is possible.
Definition: simpleLru.I:180
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
A block of bytes that stores the actual raw vertex data referenced by a GeomVertexArrayData object.
A single block as returned from SimpleAllocator::alloc().
A lightweight class that represents a single element that may be timed and/or counted via stats.
virtual void mmap_free(void *ptr, size_t size)
Frees a block of memory previously allocated via mmap_alloc().
Definition: memoryHook.cxx:566
One atomic piece that may be managed by a SimpleLru chain.
Definition: simpleLru.h:65
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
A collection of VertexDataPages, which can be used to allocate new VertexDataBlock objects.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
static void stop_threads()
Call this to stop the paging threads, if they were started.
A thread; that is, a lightweight process.
Definition: thread.h:46
size_t get_max_size() const
Returns the max size of all objects that are allowed to be active on the LRU.
Definition: simpleLru.I:28
This is a convenience class to specialize ConfigVariable as an integer type.
SimpleLru * get_lru() const
Returns the LRU that manages this page, or NULL if it is not currently managed by any LRU.
Definition: simpleLru.I:125
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
TypeHandle is the identifier used to differentiate C++ class types.
Definition: typeHandle.h:81
virtual void * mmap_alloc(size_t size, bool allow_exec)
Allocates a raw page or pages of memory directly from the OS.
Definition: memoryHook.cxx:512
static void flush_threads()
Waits for all of the pending thread tasks to finish before returning.