Panda3D
vertexDataPage.cxx
Go to the documentation of this file.
1 /**
2  * PANDA 3D SOFTWARE
3  * Copyright (c) Carnegie Mellon University. All rights reserved.
4  *
5  * All use of this software is subject to the terms of the revised BSD
6  * license. You should have received a copy of this license along
7  * with this source code in a file named "LICENSE."
8  *
9  * @file vertexDataPage.cxx
10  * @author drose
11  * @date 2007-06-04
12  */
13 
14 #include "vertexDataPage.h"
15 #include "configVariableInt.h"
16 #include "vertexDataSaveFile.h"
17 #include "vertexDataBook.h"
18 #include "vertexDataBlock.h"
19 #include "pStatTimer.h"
20 #include "memoryHook.h"
21 #include "config_gobj.h"
22 #include <algorithm>
23 
24 #ifdef HAVE_ZLIB
25 #include <zlib.h>
26 #endif
27 
28 ConfigVariableInt max_resident_vertex_data
29 ("max-resident-vertex-data", -1,
30  PRC_DESC("Specifies the maximum number of bytes of all vertex data "
31  "that is allowed to remain resident in system RAM at one time. "
32  "If more than this number of bytes of vertices are created, "
33  "the least-recently-used ones will be temporarily compressed in "
34  "system RAM until they are needed. Set it to -1 for no limit."));
35 
36 ConfigVariableInt max_compressed_vertex_data
37 ("max-compressed-vertex-data", 0,
38  PRC_DESC("Specifies the maximum number of bytes of all vertex data "
39  "that is allowed to remain compressed in system RAM at one time. "
40  "If more than this number of bytes of vertices are created, "
41  "the least-recently-used ones will be temporarily flushed to "
42  "disk until they are needed. Set it to -1 for no limit."));
43 
44 ConfigVariableInt vertex_data_compression_level
45 ("vertex-data-compression-level", 1,
46  PRC_DESC("Specifies the zlib compression level to use when compressing "
47  "vertex data. The number should be in the range 1 to 9, where "
48  "larger values are slower but give better compression."));
49 
50 ConfigVariableInt max_disk_vertex_data
51 ("max-disk-vertex-data", -1,
52  PRC_DESC("Specifies the maximum number of bytes of vertex data "
53  "that is allowed to be written to disk. Set it to -1 for no "
54  "limit."));
55 
56 PT(VertexDataPage::PageThreadManager) VertexDataPage::_thread_mgr;
57 
58 // This is a reference to an allocated Mutex, instead of just a static Mutex,
59 // to protect against ordering issues when the application shuts down.
60 Mutex &VertexDataPage::_tlock = *(new Mutex("VertexDataPage::_tlock"));
61 
62 SimpleLru VertexDataPage::_resident_lru("resident", max_resident_vertex_data);
63 SimpleLru VertexDataPage::_compressed_lru("compressed", max_compressed_vertex_data);
64 SimpleLru VertexDataPage::_disk_lru("disk", 0);
65 SimpleLru VertexDataPage::_pending_lru("pending", 0);
66 
67 SimpleLru *VertexDataPage::_global_lru[RC_end_of_list] = {
68  &VertexDataPage::_resident_lru,
69  &VertexDataPage::_compressed_lru,
70  &VertexDataPage::_disk_lru,
71 };
72 
73 VertexDataSaveFile *VertexDataPage::_save_file;
74 
75 // This mutex is (mostly) unused. We just need a Mutex to pass to the Book
76 // Constructor, below.
77 Mutex VertexDataPage::_unused_mutex;
78 
79 PStatCollector VertexDataPage::_vdata_compress_pcollector("*:Vertex Data:Compress");
80 PStatCollector VertexDataPage::_vdata_decompress_pcollector("*:Vertex Data:Decompress");
81 PStatCollector VertexDataPage::_vdata_save_pcollector("*:Vertex Data:Save");
82 PStatCollector VertexDataPage::_vdata_restore_pcollector("*:Vertex Data:Restore");
83 PStatCollector VertexDataPage::_thread_wait_pcollector("Wait:Idle");
84 PStatCollector VertexDataPage::_alloc_pages_pcollector("System memory:MMap:Vertex data");
85 
86 TypeHandle VertexDataPage::_type_handle;
87 TypeHandle VertexDataPage::DeflatePage::_type_handle;
88 
89 #if defined(HAVE_ZLIB) && !defined(USE_MEMORY_NOWRAPPERS)
90 // Define functions that hook zlib into panda's memory allocation system.
91 static void *
92 do_zlib_alloc(voidpf opaque, uInt items, uInt size) {
93  return PANDA_MALLOC_ARRAY(items * size);
94 }
95 static void
96 do_zlib_free(voidpf opaque, voidpf address) {
97  PANDA_FREE_ARRAY(address);
98 }
99 #endif // HAVE_ZLIB && !USE_MEMORY_NOWRAPPERS
100 
101 
102 /**
103  * This constructor is used only by VertexDataBook, to create a mostly-empty
104  * object that can be used to search for a particular page size in the set.
105  */
106 VertexDataPage::
107 VertexDataPage(size_t book_size) :
108  SimpleAllocator(book_size, _unused_mutex),
109  SimpleLruPage(book_size),
110  _book_size(book_size),
111  _block_size(0),
112  _book(nullptr)
113 {
114  _page_data = nullptr;
115  _size = 0;
116  _uncompressed_size = 0;
117  _ram_class = RC_resident;
118  _pending_ram_class = RC_resident;
119 }
120 
121 /**
122  *
123  */
124 VertexDataPage::
125 VertexDataPage(VertexDataBook *book, size_t page_size, size_t block_size) :
126  SimpleAllocator(page_size, book->_lock),
127  SimpleLruPage(page_size),
128  _book_size(page_size),
129  _block_size(block_size),
130  _book(book)
131 {
132  _allocated_size = round_up(page_size);
133  _page_data = alloc_page_data(_allocated_size);
134  _size = page_size;
135 
136  _uncompressed_size = _size;
137  _pending_ram_class = RC_resident;
138  set_ram_class(RC_resident);
139 }
140 
141 /**
142  *
143  */
144 VertexDataPage::
145 ~VertexDataPage() {
146 
147  // Since the only way to delete a page is via the changed_contiguous()
148  // method, the lock will already be held. MutexHolder holder(_lock);
149 
150  {
151  MutexHolder holder2(_tlock);
152  if (_pending_ram_class != _ram_class) {
153  nassertv(_thread_mgr != nullptr);
154  _thread_mgr->remove_page(this);
155  }
156  }
157 
158  if (_page_data != nullptr) {
159  free_page_data(_page_data, _allocated_size);
160  _size = 0;
161  }
162 
163  nassertv(_book == nullptr);
164 }
165 
166 /**
167  * Call this to stop the paging threads, if they were started. This may block
168  * until all of the pending tasks have been completed.
169  */
171 stop_threads() {
172  PT(PageThreadManager) thread_mgr;
173  {
174  MutexHolder holder(_tlock);
175  thread_mgr = _thread_mgr;
176  _thread_mgr.clear();
177  }
178 
179  if (thread_mgr != nullptr) {
180  gobj_cat.info()
181  << "Stopping vertex paging threads.\n";
182  thread_mgr->stop_threads();
183  }
184 }
185 
186 /**
187  * Waits for all of the pending thread tasks to finish before returning.
188  */
190 flush_threads() {
191  int num_threads = vertex_data_page_threads;
192  if (num_threads == 0) {
193  stop_threads();
194  return;
195  }
196 
197  PT(PageThreadManager) thread_mgr;
198  {
199  MutexHolder holder(_tlock);
200  thread_mgr = _thread_mgr;
201  }
202 
203  if (thread_mgr != nullptr) {
204  thread_mgr->stop_threads();
205  MutexHolder holder(_tlock);
206  thread_mgr->start_threads(num_threads);
207  }
208 }
209 
210 /**
211  *
212  */
213 void VertexDataPage::
214 output(std::ostream &out) const {
215  SimpleAllocator::output(out);
216 }
217 
218 /**
219  *
220  */
221 void VertexDataPage::
222 write(std::ostream &out, int indent_level) const {
223  SimpleAllocator::write(out);
224 }
225 
226 /**
227  * Creates a new SimpleAllocatorBlock object. Override this function to
228  * specialize the block type returned.
229  */
230 SimpleAllocatorBlock *VertexDataPage::
231 make_block(size_t start, size_t size) {
232  return new VertexDataBlock(this, start, size);
233 }
234 
235 /**
236  * This callback function is made whenever the estimate of contiguous
237  * available space changes, either through an alloc or free. The lock will be
238  * held.
239  */
240 void VertexDataPage::
241 changed_contiguous() {
242  if (do_is_empty()) {
243  // If the page is now empty, delete it.
244  VertexDataBook::Pages::iterator pi = _book->_pages.find(this);
245  nassertv(pi != _book->_pages.end());
246  _book->_pages.erase(pi);
247  _book = nullptr;
248  delete this;
249  return;
250  }
251 
252  adjust_book_size();
253 }
254 
255 /**
256  * Evicts the page from the LRU. Called internally when the LRU determines
257  * that it is full. May also be called externally when necessary to
258  * explicitly evict the page.
259  *
260  * It is legal for this method to either evict the page as requested, do
261  * nothing (in which case the eviction will be requested again at the next
262  * epoch), or requeue itself on the tail of the queue (in which case the
263  * eviction will be requested again much later).
264  */
265 void VertexDataPage::
266 evict_lru() {
267  MutexHolder holder(_lock);
268 
269  switch (_ram_class) {
270  case RC_resident:
271  if (_compressed_lru.get_max_size() == 0) {
272  request_ram_class(RC_disk);
273  } else {
274  request_ram_class(RC_compressed);
275  }
276  break;
277 
278  case RC_compressed:
279  request_ram_class(RC_disk);
280  break;
281 
282  case RC_disk:
283  case RC_end_of_list:
284  gobj_cat.warning()
285  << "Internal error: attempt to evict array data " << this
286  << " in inappropriate state " << _ram_class << ".\n";
287  break;
288  }
289 }
290 
291 /**
292  * Allocates a new block. Returns NULL if a block of the requested size
293  * cannot be allocated.
294  *
295  * To free the allocated block, call block->free(), or simply delete the block
296  * pointer.
297  *
298  * Assumes the lock is already held.
299  */
300 VertexDataBlock *VertexDataPage::
301 do_alloc(size_t size) {
302  VertexDataBlock *block = (VertexDataBlock *)SimpleAllocator::do_alloc(size);
303 
304  if (block != nullptr && _ram_class != RC_disk) {
305  // When we allocate a new block within a resident page, we have to clear
306  // the disk cache (since we have just invalidated it).
307  _saved_block.clear();
308  }
309 
310  return block;
311 }
312 
313 /**
314  * Short-circuits the thread and forces the page into resident status
315  * immediately.
316  *
317  * Intended to be called from the main thread. Assumes the lock is already
318  * held.
319  */
320 void VertexDataPage::
321 make_resident_now() {
322  MutexHolder holder(_tlock);
323  if (_pending_ram_class != _ram_class) {
324  nassertv(_thread_mgr != nullptr);
325  _thread_mgr->remove_page(this);
326  }
327 
328  make_resident();
329  _pending_ram_class = RC_resident;
330 }
331 
332 /**
333  * Moves the page to fully resident status by expanding it or reading it from
334  * disk as necessary.
335  *
336  * Intended to be called from the sub-thread. Assumes the lock is already
337  * held.
338  */
339 void VertexDataPage::
340 make_resident() {
341  if (_ram_class == RC_resident) {
342  mark_used_lru();
343  return;
344  }
345 
346  if (_ram_class == RC_disk) {
347  do_restore_from_disk();
348  }
349 
350  if (_ram_class == RC_compressed) {
351 #ifdef HAVE_ZLIB
352  PStatTimer timer(_vdata_decompress_pcollector);
353 
354  if (gobj_cat.is_debug()) {
355  gobj_cat.debug()
356  << "Expanding page from " << _size
357  << " to " << _uncompressed_size << "\n";
358  }
359  size_t new_allocated_size = round_up(_uncompressed_size);
360  unsigned char *new_data = alloc_page_data(new_allocated_size);
361  unsigned char *end_data = new_data + new_allocated_size;
362 
363  z_stream z_source;
364 #ifdef USE_MEMORY_NOWRAPPERS
365  z_source.zalloc = Z_NULL;
366  z_source.zfree = Z_NULL;
367 #else
368  z_source.zalloc = (alloc_func)&do_zlib_alloc;
369  z_source.zfree = (free_func)&do_zlib_free;
370 #endif
371 
372  z_source.opaque = Z_NULL;
373  z_source.msg = (char *) "no error message";
374 
375  z_source.next_in = (Bytef *)(char *)_page_data;
376  z_source.avail_in = _size;
377  z_source.next_out = (Bytef *)new_data;
378  z_source.avail_out = new_allocated_size;
379 
380  int result = inflateInit(&z_source);
381  if (result < 0) {
382  nassert_raise("zlib error");
383  return;
384  }
386 
387  size_t output_size = 0;
388 
389  int flush = 0;
390  result = 0;
391  while (result != Z_STREAM_END) {
392  unsigned char *start_out = (unsigned char *)z_source.next_out;
393  nassertv(start_out < end_data);
394  z_source.avail_out = std::min((size_t)(end_data - start_out), (size_t)inflate_page_size);
395  nassertv(z_source.avail_out != 0);
396  result = inflate(&z_source, flush);
397  if (result < 0 && result != Z_BUF_ERROR) {
398  nassert_raise("zlib error");
399  return;
400  }
401  size_t bytes_produced = (size_t)((unsigned char *)z_source.next_out - start_out);
402  output_size += bytes_produced;
403  if (bytes_produced == 0) {
404  // If we ever produce no bytes, then start flushing the output.
405  flush = Z_FINISH;
406  }
407 
409  }
410  nassertv(z_source.avail_in == 0);
411  nassertv(output_size == _uncompressed_size);
412 
413  result = inflateEnd(&z_source);
414  nassertv(result == Z_OK);
415 
416  free_page_data(_page_data, _allocated_size);
417  _page_data = new_data;
418  _size = _uncompressed_size;
419  _allocated_size = new_allocated_size;
420 #endif
421 
422  set_lru_size(_size);
423  set_ram_class(RC_resident);
424  }
425 }
426 
427 /**
428  * Moves the page to compressed status by compressing it or reading it from
429  * disk as necessary.
430  *
431  * Assumes the lock is already held.
432  */
433 void VertexDataPage::
434 make_compressed() {
435  if (_ram_class == RC_compressed) {
436  // If we're already compressed, just mark the page recently used.
437  mark_used_lru();
438  return;
439  }
440 
441  if (_ram_class == RC_disk) {
442  do_restore_from_disk();
443  }
444 
445  if (_ram_class == RC_resident) {
446  nassertv(_size == _uncompressed_size);
447 
448 #ifdef HAVE_ZLIB
449  PStatTimer timer(_vdata_compress_pcollector);
450 
451  DeflatePage *page = new DeflatePage;
452  DeflatePage *head = page;
453 
454  z_stream z_dest;
455 #ifdef USE_MEMORY_NOWRAPPERS
456  z_dest.zalloc = Z_NULL;
457  z_dest.zfree = Z_NULL;
458 #else
459  z_dest.zalloc = (alloc_func)&do_zlib_alloc;
460  z_dest.zfree = (free_func)&do_zlib_free;
461 #endif
462 
463  z_dest.opaque = Z_NULL;
464  z_dest.msg = (char *) "no error message";
465 
466  int result = deflateInit(&z_dest, vertex_data_compression_level);
467  if (result < 0) {
468  nassert_raise("zlib error");
469  return;
470  }
472 
473  z_dest.next_in = (Bytef *)(char *)_page_data;
474  z_dest.avail_in = _uncompressed_size;
475  size_t output_size = 0;
476 
477  // Compress the data into one or more individual pages. We have to
478  // compress it page-at-a-time, since we're not really sure how big the
479  // result will be (so we can't easily pre-allocate a buffer).
480  int flush = 0;
481  result = 0;
482  while (result != Z_STREAM_END) {
483  unsigned char *start_out = (page->_buffer + page->_used_size);
484  z_dest.next_out = (Bytef *)start_out;
485  z_dest.avail_out = (size_t)deflate_page_size - page->_used_size;
486  if (z_dest.avail_out == 0) {
487  DeflatePage *new_page = new DeflatePage;
488  page->_next = new_page;
489  page = new_page;
490  start_out = page->_buffer;
491  z_dest.next_out = (Bytef *)start_out;
492  z_dest.avail_out = deflate_page_size;
493  }
494 
495  result = deflate(&z_dest, flush);
496  if (result < 0 && result != Z_BUF_ERROR) {
497  nassert_raise("zlib error");
498  return;
499  }
500  size_t bytes_produced = (size_t)((unsigned char *)z_dest.next_out - start_out);
501  page->_used_size += bytes_produced;
502  nassertv(page->_used_size <= deflate_page_size);
503  output_size += bytes_produced;
504  if (bytes_produced == 0) {
505  // If we ever produce no bytes, then start flushing the output.
506  flush = Z_FINISH;
507  }
508 
510  }
511  nassertv(z_dest.avail_in == 0);
512 
513  result = deflateEnd(&z_dest);
514  nassertv(result == Z_OK);
515 
516  // Now we know how big the result will be. Allocate a buffer, and copy
517  // the data from the various pages.
518 
519  size_t new_allocated_size = round_up(output_size);
520  unsigned char *new_data = alloc_page_data(new_allocated_size);
521 
522  size_t copied_size = 0;
523  unsigned char *p = new_data;
524  page = head;
525  while (page != nullptr) {
526  memcpy(p, page->_buffer, page->_used_size);
527  copied_size += page->_used_size;
528  p += page->_used_size;
529  DeflatePage *next = page->_next;
530  delete page;
531  page = next;
532  }
533  nassertv(copied_size == output_size);
534 
535  // Now free the original, uncompressed data, and put this new compressed
536  // buffer in its place.
537  free_page_data(_page_data, _allocated_size);
538  _page_data = new_data;
539  _size = output_size;
540  _allocated_size = new_allocated_size;
541 
542  if (gobj_cat.is_debug()) {
543  gobj_cat.debug()
544  << "Compressed " << *this << " from " << _uncompressed_size
545  << " to " << _size << "\n";
546  }
547 #endif
548  set_lru_size(_size);
549  set_ram_class(RC_compressed);
550  }
551 }
552 
553 /**
554  * Moves the page to disk status by writing it to disk as necessary.
555  *
556  * Assumes the lock is already held.
557  */
558 void VertexDataPage::
559 make_disk() {
560  if (_ram_class == RC_disk) {
561  // If we're already on disk, just mark the page recently used.
562  mark_used_lru();
563  return;
564  }
565 
566  if (_ram_class == RC_resident || _ram_class == RC_compressed) {
567  if (!do_save_to_disk()) {
568  // Can't save it to disk for some reason.
569  gobj_cat.warning()
570  << "Couldn't save page " << this << " to disk.\n";
571  mark_used_lru();
572  return;
573  }
574 
575  free_page_data(_page_data, _allocated_size);
576  _page_data = nullptr;
577  _size = 0;
578 
579  set_ram_class(RC_disk);
580  }
581 }
582 
583 /**
584  * Writes the page to disk, but does not evict it from memory or affect its
585  * LRU status. If it gets evicted later without having been modified, it will
586  * not need to write itself to disk again.
587  *
588  * Returns true on success, false on failure. Assumes the lock is already
589  * held.
590  */
591 bool VertexDataPage::
592 do_save_to_disk() {
593  if (_ram_class == RC_resident || _ram_class == RC_compressed) {
594  PStatTimer timer(_vdata_save_pcollector);
595 
596  if (_saved_block == nullptr) {
597  if (gobj_cat.is_debug()) {
598  gobj_cat.debug()
599  << "Storing page, " << _size << " bytes, to disk\n";
600  }
601 
602  bool compressed = (_ram_class == RC_compressed);
603 
604  _saved_block = get_save_file()->write_data(_page_data, _allocated_size, compressed);
605  if (_saved_block == nullptr) {
606  // Can't write it to disk. Too bad.
607  return false;
608  }
609  } else {
610  if (gobj_cat.is_debug()) {
611  gobj_cat.debug()
612  << "Page already stored: " << _size << " bytes\n";
613  }
614  }
615  }
616 
617  return true;
618 }
619 
620 /**
621  * Restores the page from disk and makes it either compressed or resident
622  * (according to whether it was stored compressed on disk).
623  *
624  * Assumes the lock is already held.
625  */
626 void VertexDataPage::
627 do_restore_from_disk() {
628  if (_ram_class == RC_disk) {
629  nassertv(_saved_block != nullptr);
630  nassertv(_page_data == nullptr && _size == 0);
631 
632  PStatTimer timer(_vdata_restore_pcollector);
633 
634  size_t buffer_size = _saved_block->get_size();
635  if (gobj_cat.is_debug()) {
636  gobj_cat.debug()
637  << "Restoring page, " << buffer_size << " bytes, from disk\n";
638  }
639 
640  size_t new_allocated_size = round_up(buffer_size);
641  unsigned char *new_data = alloc_page_data(new_allocated_size);
642  if (!get_save_file()->read_data(new_data, new_allocated_size, _saved_block)) {
643  nassert_raise("read error");
644  }
645 
646  nassertv(_page_data == nullptr);
647  _page_data = new_data;
648  _size = buffer_size;
649  _allocated_size = new_allocated_size;
650 
651  set_lru_size(_size);
652  if (_saved_block->get_compressed()) {
653  set_ram_class(RC_compressed);
654  } else {
655  set_ram_class(RC_resident);
656  }
657  }
658 }
659 
660 /**
661  * Called when the "book size"--the size of the page as recorded in its book's
662  * table--has changed for some reason. Assumes the lock is held.
663  */
664 void VertexDataPage::
665 adjust_book_size() {
666  size_t new_size = _contiguous;
667  if (_ram_class != RC_resident) {
668  // Let's not attempt to allocate new buffers from non-resident pages.
669  new_size = 0;
670  }
671 
672  if (_book != nullptr && new_size != _book_size) {
673  VertexDataBook::Pages::iterator pi = _book->_pages.find(this);
674  nassertv(pi != _book->_pages.end());
675  _book->_pages.erase(pi);
676 
677  _book_size = new_size;
678  bool inserted = _book->_pages.insert(this).second;
679  nassertv(inserted);
680  }
681 }
682 
683 /**
684  * Requests the thread set the page to the indicated ram class (if we are
685  * using threading). The page will be enqueued in the thread, which will
686  * eventually be responsible for setting the requested ram class.
687  *
688  * Assumes the page's lock is already held.
689  */
690 void VertexDataPage::
691 request_ram_class(RamClass ram_class) {
692  int num_threads = vertex_data_page_threads;
693  if (num_threads == 0 || !Thread::is_threading_supported()) {
694  // No threads. Do it immediately.
695  switch (ram_class) {
696  case RC_resident:
697  make_resident();
698  break;
699 
700  case RC_compressed:
701  make_compressed();
702  break;
703 
704  case RC_disk:
705  make_disk();
706  break;
707 
708  case RC_end_of_list:
709  break;
710  }
711  _pending_ram_class = ram_class;
712  return;
713  }
714 
715  MutexHolder holder(_tlock);
716  if (_thread_mgr == nullptr) {
717  // Create the thread manager.
718  gobj_cat.info()
719  << "Spawning " << num_threads << " vertex paging threads.\n";
720  _thread_mgr = new PageThreadManager(num_threads);
721  }
722 
723  _thread_mgr->add_page(this, ram_class);
724 }
725 
726 /**
727  * Creates the global VertexDataSaveFile that will be used to save vertex data
728  * buffers to disk when necessary.
729  */
730 void VertexDataPage::
731 make_save_file() {
732  size_t max_size = (size_t)max_disk_vertex_data;
733 
734  _save_file = new VertexDataSaveFile(vertex_save_file_directory,
735  vertex_save_file_prefix, max_size);
736 }
737 
738 /**
739  * Allocates and returns a freshly-allocated buffer of at least the indicated
740  * size for holding vertex data.
741  */
742 unsigned char *VertexDataPage::
743 alloc_page_data(size_t page_size) const {
744  _alloc_pages_pcollector.add_level_now(page_size);
745  return (unsigned char *)memory_hook->mmap_alloc(page_size, false);
746 }
747 
748 /**
749  * Releases a buffer allocated via alloc_page_data().
750  */
751 void VertexDataPage::
752 free_page_data(unsigned char *page_data, size_t page_size) const {
753  _alloc_pages_pcollector.sub_level_now(page_size);
754  memory_hook->mmap_free(page_data, page_size);
755 }
756 
757 /**
758  * Assumes _tlock is held.
759  */
760 VertexDataPage::PageThreadManager::
761 PageThreadManager(int num_threads) :
762  _shutdown(false),
763  _pending_cvar(_tlock)
764 {
765  start_threads(num_threads);
766 }
767 
768 /**
769  * Enqueues the indicated page on the thread queue to convert it to the
770  * specified ram class.
771  *
772  * It is assumed the page's lock is already held, and that _tlock is already
773  * held.
774  */
775 void VertexDataPage::PageThreadManager::
776 add_page(VertexDataPage *page, RamClass ram_class) {
777  nassertv(!_shutdown);
778 
779  if (page->_pending_ram_class == ram_class) {
780  // It's already queued.
781  nassertv(page->get_lru() == &_pending_lru);
782  return;
783  }
784 
785  if (page->_pending_ram_class != page->_ram_class) {
786  // It's already queued, but for a different ram class. Dequeue it so we
787  // can requeue it.
788  remove_page(page);
789  }
790 
791  if (page->_pending_ram_class != ram_class) {
792  // First, move the page to the "pending" LRU. When it eventually gets its
793  // requested ram class set, it will be requeued on the appropriate live
794  // LRU.
795  page->mark_used_lru(&_pending_lru);
796 
797  page->_pending_ram_class = ram_class;
798  if (ram_class == RC_resident) {
799  _pending_reads.push_back(page);
800  } else {
801  _pending_writes.push_back(page);
802  }
803  _pending_cvar.notify();
804  }
805 }
806 
807 /**
808  * Dequeues the indicated page and removes it from the pending task list.
809  *
810  * It is assumed the page's lock is already held, and that _tlock is already
811  * held.
812  */
813 void VertexDataPage::PageThreadManager::
814 remove_page(VertexDataPage *page) {
815  nassertv(page != nullptr);
816 
817  PageThreads::iterator ti;
818  for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
819  PageThread *thread = (*ti);
820  if (page == thread->_working_page) {
821  // Oops, this thread is currently working on this one. We'll have to
822  // wait for the thread to finish.
823  page->_lock.release();
824  while (page == thread->_working_page) {
825  thread->_working_cvar.wait();
826  }
827  page->_lock.acquire();
828  return;
829  }
830  }
831 
832  if (page->_pending_ram_class == RC_resident) {
833  PendingPages::iterator pi =
834  find(_pending_reads.begin(), _pending_reads.end(), page);
835  nassertv(pi != _pending_reads.end());
836  _pending_reads.erase(pi);
837  } else {
838  PendingPages::iterator pi =
839  find(_pending_writes.begin(), _pending_writes.end(), page);
840  nassertv(pi != _pending_writes.end());
841  _pending_writes.erase(pi);
842  }
843 
844  page->_pending_ram_class = page->_ram_class;
845 
846  // Put the page back on its proper LRU.
847  page->mark_used_lru(_global_lru[page->_ram_class]);
848 }
849 
850 /**
851  * Returns the number of threads active on the thread manager. Assumes _tlock
852  * is held.
853  */
854 int VertexDataPage::PageThreadManager::
855 get_num_threads() const {
856  return (int)_threads.size();
857 }
858 
859 /**
860  * Returns the number of read requests waiting on the queue. Assumes _tlock
861  * is held.
862  */
863 int VertexDataPage::PageThreadManager::
864 get_num_pending_reads() const {
865  return (int)_pending_reads.size();
866 }
867 
868 /**
869  * Returns the number of write requests waiting on the queue. Assumes _tlock
870  * is held.
871  */
872 int VertexDataPage::PageThreadManager::
873 get_num_pending_writes() const {
874  return (int)_pending_writes.size();
875 }
876 
877 /**
878  * Adds the indicated of threads to the list of active threads. Assumes
879  * _tlock is held.
880  */
881 void VertexDataPage::PageThreadManager::
882 start_threads(int num_threads) {
883  _shutdown = false;
884 
885  _threads.reserve(num_threads);
886  for (int i = 0; i < num_threads; ++i) {
887  std::ostringstream name_strm;
888  name_strm << "VertexDataPage" << _threads.size();
889  PT(PageThread) thread = new PageThread(this, name_strm.str());
890  thread->start(TP_low, true);
891  _threads.push_back(thread);
892  }
893 }
894 
895 /**
896  * Signals all the threads to stop and waits for them. Does not return until
897  * the threads have finished. Assumes _tlock is *not* held.
898  */
899 void VertexDataPage::PageThreadManager::
900 stop_threads() {
901  PageThreads threads;
902  {
903  MutexHolder holder(_tlock);
904  _shutdown = true;
905  _pending_cvar.notify_all();
906  threads.swap(_threads);
907  }
908 
909  PageThreads::iterator ti;
910  for (ti = threads.begin(); ti != threads.end(); ++ti) {
911  PageThread *thread = (*ti);
912  thread->join();
913  }
914 
915  nassertv(_pending_reads.empty() && _pending_writes.empty());
916 }
917 
918 /**
919  *
920  */
921 VertexDataPage::PageThread::
922 PageThread(PageThreadManager *manager, const std::string &name) :
923  Thread(name, name),
924  _manager(manager),
925  _working_cvar(_tlock)
926 {
927 }
928 
929 /**
930  * The main processing loop for each sub-thread.
931  */
932 void VertexDataPage::PageThread::
933 thread_main() {
934  _tlock.acquire();
935 
936  while (true) {
937  PStatClient::thread_tick(get_sync_name());
938 
939  while (_manager->_pending_reads.empty() &&
940  _manager->_pending_writes.empty()) {
941  if (_manager->_shutdown) {
942  _tlock.release();
943  return;
944  }
945  PStatTimer timer(_thread_wait_pcollector);
946  _manager->_pending_cvar.wait();
947  }
948 
949  // Reads always have priority.
950  if (!_manager->_pending_reads.empty()) {
951  _working_page = _manager->_pending_reads.front();
952  _manager->_pending_reads.pop_front();
953  } else {
954  _working_page = _manager->_pending_writes.front();
955  _manager->_pending_writes.pop_front();
956  }
957 
958  RamClass ram_class = _working_page->_pending_ram_class;
959  _tlock.release();
960 
961  {
962  MutexHolder holder(_working_page->_lock);
963  switch (ram_class) {
964  case RC_resident:
965  _working_page->make_resident();
966  break;
967 
968  case RC_compressed:
969  _working_page->make_compressed();
970  break;
971 
972  case RC_disk:
973  _working_page->make_disk();
974  break;
975 
976  case RC_end_of_list:
977  break;
978  }
979  }
980 
981  _tlock.acquire();
982 
983  _working_page = nullptr;
984  _working_cvar.notify();
985 
987  }
988 }
This is a convenience class to specialize ConfigVariable as an integer type.
virtual void * mmap_alloc(size_t size, bool allow_exec)
Allocates a raw page or pages of memory directly from the OS.
Definition: memoryHook.cxx:512
virtual void mmap_free(void *ptr, size_t size)
Frees a block of memory previously allocated via mmap_alloc().
Definition: memoryHook.cxx:566
A lightweight C++ object whose constructor calls acquire() and whose destructor calls release() on a ...
Definition: mutexHolder.h:25
A standard mutex, or mutual exclusion lock.
Definition: pmutex.h:40
A lightweight class that represents a single element that may be timed and/or counted via stats.
A lightweight class that can be used to automatically start and stop a PStatCollector around a sectio...
Definition: pStatTimer.h:30
A single block as returned from SimpleAllocator::alloc().
An implementation of a very simple block allocator.
One atomic piece that may be managed by a SimpleLru chain.
Definition: simpleLru.h:65
void mark_used_lru() const
To be called when the page is used; this will move it to the tail of the SimpleLru queue it is alread...
Definition: simpleLru.I:152
SimpleLru * get_lru() const
Returns the LRU that manages this page, or NULL if it is not currently managed by any LRU.
Definition: simpleLru.I:125
void set_lru_size(size_t lru_size)
Specifies the size of this page, presumably in bytes, although any unit is possible.
Definition: simpleLru.I:180
An implementation of a very simple LRU algorithm.
Definition: simpleLru.h:28
size_t get_max_size() const
Returns the max size of all objects that are allowed to be active on the LRU.
Definition: simpleLru.I:28
A thread; that is, a lightweight process.
Definition: thread.h:46
is_threading_supported
Returns true if threading support has been compiled in and enabled, or false if no threading is avail...
Definition: thread.h:112
static void consider_yield()
Possibly suspends the current thread for the rest of the current epoch, if it has run for enough this...
Definition: thread.I:212
TypeHandle is the identifier used to differentiate C++ class types.
Definition: typeHandle.h:81
A block of bytes that stores the actual raw vertex data referenced by a GeomVertexArrayData object.
A collection of VertexDataPages, which can be used to allocate new VertexDataBlock objects.
A block of bytes that holds one or more VertexDataBlocks.
static void flush_threads()
Waits for all of the pending thread tasks to finish before returning.
static void stop_threads()
Call this to stop the paging threads, if they were started.
get_save_file
Returns the global VertexDataSaveFile that will be used to save vertex data buffers to disk when nece...
A temporary file to hold the vertex data that has been evicted from memory and written to disk.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.
PANDA 3D SOFTWARE Copyright (c) Carnegie Mellon University.