00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "vertexDataPage.h"
00016 #include "configVariableInt.h"
00017 #include "vertexDataSaveFile.h"
00018 #include "vertexDataBook.h"
00019 #include "pStatTimer.h"
00020 #include "memoryHook.h"
00021
00022 #ifdef HAVE_ZLIB
00023 #include <zlib.h>
00024 #endif
00025
00026 ConfigVariableInt max_resident_vertex_data
00027 ("max-resident-vertex-data", -1,
00028 PRC_DESC("Specifies the maximum number of bytes of all vertex data "
00029 "that is allowed to remain resident in system RAM at one time. "
00030 "If more than this number of bytes of vertices are created, "
00031 "the least-recently-used ones will be temporarily compressed in "
00032 "system RAM until they are needed. Set it to -1 for no limit."));
00033
00034 ConfigVariableInt max_compressed_vertex_data
00035 ("max-compressed-vertex-data", 0,
00036 PRC_DESC("Specifies the maximum number of bytes of all vertex data "
00037 "that is allowed to remain compressed in system RAM at one time. "
00038 "If more than this number of bytes of vertices are created, "
00039 "the least-recently-used ones will be temporarily flushed to "
00040 "disk until they are needed. Set it to -1 for no limit."));
00041
00042 ConfigVariableInt vertex_data_compression_level
00043 ("vertex-data-compression-level", 1,
00044 PRC_DESC("Specifies the zlib compression level to use when compressing "
00045 "vertex data. The number should be in the range 1 to 9, where "
00046 "larger values are slower but give better compression."));
00047
00048 ConfigVariableInt max_disk_vertex_data
00049 ("max-disk-vertex-data", -1,
00050 PRC_DESC("Specifies the maximum number of bytes of vertex data "
00051 "that is allowed to be written to disk. Set it to -1 for no "
00052 "limit."));
00053
00054 PT(VertexDataPage::PageThreadManager) VertexDataPage::_thread_mgr;
00055
00056
00057
00058
00059 Mutex &VertexDataPage::_tlock = *(new Mutex("VertexDataPage::_tlock"));
00060
00061 SimpleLru VertexDataPage::_resident_lru("resident", max_resident_vertex_data);
00062 SimpleLru VertexDataPage::_compressed_lru("compressed", max_compressed_vertex_data);
00063 SimpleLru VertexDataPage::_disk_lru("disk", 0);
00064 SimpleLru VertexDataPage::_pending_lru("pending", 0);
00065
00066 SimpleLru *VertexDataPage::_global_lru[RC_end_of_list] = {
00067 &VertexDataPage::_resident_lru,
00068 &VertexDataPage::_compressed_lru,
00069 &VertexDataPage::_disk_lru,
00070 };
00071
00072 VertexDataSaveFile *VertexDataPage::_save_file;
00073
00074
00075
00076 Mutex VertexDataPage::_unused_mutex;
00077
00078 PStatCollector VertexDataPage::_vdata_compress_pcollector("*:Vertex Data:Compress");
00079 PStatCollector VertexDataPage::_vdata_decompress_pcollector("*:Vertex Data:Decompress");
00080 PStatCollector VertexDataPage::_vdata_save_pcollector("*:Vertex Data:Save");
00081 PStatCollector VertexDataPage::_vdata_restore_pcollector("*:Vertex Data:Restore");
00082 PStatCollector VertexDataPage::_thread_wait_pcollector("Wait:Idle");
00083 PStatCollector VertexDataPage::_alloc_pages_pcollector("System memory:MMap:Vertex data");
00084
00085 TypeHandle VertexDataPage::_type_handle;
00086 TypeHandle VertexDataPage::DeflatePage::_type_handle;
00087
00088 #if defined(HAVE_ZLIB) && !defined(USE_MEMORY_NOWRAPPERS)
00089
00090 static void *
00091 do_zlib_alloc(voidpf opaque, uInt items, uInt size) {
00092 return PANDA_MALLOC_ARRAY(items * size);
00093 }
00094 static void
00095 do_zlib_free(voidpf opaque, voidpf address) {
00096 PANDA_FREE_ARRAY(address);
00097 }
00098 #endif // HAVE_ZLIB && !USE_MEMORY_NOWRAPPERS
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108 VertexDataPage::
00109 VertexDataPage(size_t book_size) :
00110 SimpleAllocator(book_size, _unused_mutex),
00111 SimpleLruPage(book_size),
00112 _book_size(book_size),
00113 _block_size(0),
00114 _book(NULL)
00115 {
00116 _page_data = NULL;
00117 _size = 0;
00118 _uncompressed_size = 0;
00119 _ram_class = RC_resident;
00120 _pending_ram_class = RC_resident;
00121 }
00122
00123
00124
00125
00126
00127
00128 VertexDataPage::
00129 VertexDataPage(VertexDataBook *book, size_t page_size, size_t block_size) :
00130 SimpleAllocator(page_size, book->_lock),
00131 SimpleLruPage(page_size),
00132 _book_size(page_size),
00133 _block_size(block_size),
00134 _book(book)
00135 {
00136 _allocated_size = round_up(page_size);
00137 _page_data = alloc_page_data(_allocated_size);
00138 _size = page_size;
00139
00140 _uncompressed_size = _size;
00141 _pending_ram_class = RC_resident;
00142 set_ram_class(RC_resident);
00143 }
00144
00145
00146
00147
00148
00149
00150 VertexDataPage::
00151 ~VertexDataPage() {
00152
00153
00154
00155
00156
00157 {
00158 MutexHolder holder2(_tlock);
00159 if (_pending_ram_class != _ram_class) {
00160 nassertv(_thread_mgr != (PageThreadManager *)NULL);
00161 _thread_mgr->remove_page(this);
00162 }
00163 }
00164
00165 if (_page_data != NULL) {
00166 free_page_data(_page_data, _allocated_size);
00167 _size = 0;
00168 }
00169
00170 nassertv(_book == NULL);
00171 }
00172
00173
00174
00175
00176
00177
00178
00179
00180 void VertexDataPage::
00181 stop_threads() {
00182 PT(PageThreadManager) thread_mgr;
00183 {
00184 MutexHolder holder(_tlock);
00185 thread_mgr = _thread_mgr;
00186 _thread_mgr.clear();
00187 }
00188
00189 if (thread_mgr != (PageThreadManager *)NULL) {
00190 gobj_cat.info()
00191 << "Stopping vertex paging threads.\n";
00192 thread_mgr->stop_threads();
00193 }
00194 }
00195
00196
00197
00198
00199
00200
00201
00202 void VertexDataPage::
00203 flush_threads() {
00204 int num_threads = vertex_data_page_threads;
00205 if (num_threads == 0) {
00206 stop_threads();
00207 return;
00208 }
00209
00210 PT(PageThreadManager) thread_mgr;
00211 {
00212 MutexHolder holder(_tlock);
00213 thread_mgr = _thread_mgr;
00214 }
00215
00216 if (thread_mgr != (PageThreadManager *)NULL) {
00217 thread_mgr->stop_threads();
00218 MutexHolder holder(_tlock);
00219 thread_mgr->start_threads(num_threads);
00220 }
00221 }
00222
00223
00224
00225
00226
00227
00228 void VertexDataPage::
00229 output(ostream &out) const {
00230 SimpleAllocator::output(out);
00231 }
00232
00233
00234
00235
00236
00237
00238 void VertexDataPage::
00239 write(ostream &out, int indent_level) const {
00240 SimpleAllocator::write(out);
00241 }
00242
00243
00244
00245
00246
00247
00248
00249 SimpleAllocatorBlock *VertexDataPage::
00250 make_block(size_t start, size_t size) {
00251 return new VertexDataBlock(this, start, size);
00252 }
00253
00254
00255
00256
00257
00258
00259
00260
00261 void VertexDataPage::
00262 changed_contiguous() {
00263 if (do_is_empty()) {
00264
00265 VertexDataBook::Pages::iterator pi = _book->_pages.find(this);
00266 nassertv(pi != _book->_pages.end());
00267 _book->_pages.erase(pi);
00268 _book = NULL;
00269 delete this;
00270 return;
00271 }
00272
00273 adjust_book_size();
00274 }
00275
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291 void VertexDataPage::
00292 evict_lru() {
00293 MutexHolder holder(_lock);
00294
00295 switch (_ram_class) {
00296 case RC_resident:
00297 if (_compressed_lru.get_max_size() == 0) {
00298 request_ram_class(RC_disk);
00299 } else {
00300 request_ram_class(RC_compressed);
00301 }
00302 break;
00303
00304 case RC_compressed:
00305 request_ram_class(RC_disk);
00306 break;
00307
00308 case RC_disk:
00309 case RC_end_of_list:
00310 gobj_cat.warning()
00311 << "Internal error: attempt to evict array data " << this
00312 << " in inappropriate state " << _ram_class << ".\n";
00313 break;
00314 }
00315 }
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328 VertexDataBlock *VertexDataPage::
00329 do_alloc(size_t size) {
00330 VertexDataBlock *block = (VertexDataBlock *)SimpleAllocator::do_alloc(size);
00331
00332 if (block != (VertexDataBlock *)NULL && _ram_class != RC_disk) {
00333
00334
00335 _saved_block.clear();
00336 }
00337
00338 return block;
00339 }
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350 void VertexDataPage::
00351 make_resident_now() {
00352 MutexHolder holder(_tlock);
00353 if (_pending_ram_class != _ram_class) {
00354 nassertv(_thread_mgr != (PageThreadManager *)NULL);
00355 _thread_mgr->remove_page(this);
00356 }
00357
00358 make_resident();
00359 _pending_ram_class = RC_resident;
00360 }
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371 void VertexDataPage::
00372 make_resident() {
00373 if (_ram_class == RC_resident) {
00374 mark_used_lru();
00375 return;
00376 }
00377
00378 if (_ram_class == RC_disk) {
00379 do_restore_from_disk();
00380 }
00381
00382 if (_ram_class == RC_compressed) {
00383 #ifdef HAVE_ZLIB
00384 PStatTimer timer(_vdata_decompress_pcollector);
00385
00386 if (gobj_cat.is_debug()) {
00387 gobj_cat.debug()
00388 << "Expanding page from " << _size
00389 << " to " << _uncompressed_size << "\n";
00390 }
00391 size_t new_allocated_size = round_up(_uncompressed_size);
00392 unsigned char *new_data = alloc_page_data(new_allocated_size);
00393 unsigned char *end_data = new_data + new_allocated_size;
00394
00395 z_stream z_source;
00396 #ifdef USE_MEMORY_NOWRAPPERS
00397 z_source.zalloc = Z_NULL;
00398 z_source.zfree = Z_NULL;
00399 #else
00400 z_source.zalloc = (alloc_func)&do_zlib_alloc;
00401 z_source.zfree = (free_func)&do_zlib_free;
00402 #endif
00403
00404 z_source.opaque = Z_NULL;
00405 z_source.msg = (char *) "no error message";
00406
00407 z_source.next_in = (Bytef *)(char *)_page_data;
00408 z_source.avail_in = _size;
00409 z_source.next_out = (Bytef *)new_data;
00410 z_source.avail_out = new_allocated_size;
00411
00412 int result = inflateInit(&z_source);
00413 if (result < 0) {
00414 nassert_raise("zlib error");
00415 return;
00416 }
00417 Thread::consider_yield();
00418
00419 size_t output_size = 0;
00420
00421 int flush = 0;
00422 result = 0;
00423 while (result != Z_STREAM_END) {
00424 unsigned char *start_out = (unsigned char *)z_source.next_out;
00425 nassertv(start_out < end_data);
00426 z_source.avail_out = min((size_t)(end_data - start_out), (size_t)inflate_page_size);
00427 nassertv(z_source.avail_out != 0);
00428 result = inflate(&z_source, flush);
00429 if (result < 0 && result != Z_BUF_ERROR) {
00430 nassert_raise("zlib error");
00431 return;
00432 }
00433 size_t bytes_produced = (size_t)((unsigned char *)z_source.next_out - start_out);
00434 output_size += bytes_produced;
00435 if (bytes_produced == 0) {
00436
00437 flush = Z_FINISH;
00438 }
00439
00440 Thread::consider_yield();
00441 }
00442 nassertv(z_source.avail_in == 0);
00443 nassertv(output_size == _uncompressed_size);
00444
00445 result = inflateEnd(&z_source);
00446 nassertv(result == Z_OK);
00447
00448 free_page_data(_page_data, _allocated_size);
00449 _page_data = new_data;
00450 _size = _uncompressed_size;
00451 _allocated_size = new_allocated_size;
00452 #endif
00453
00454 set_lru_size(_size);
00455 set_ram_class(RC_resident);
00456 }
00457 }
00458
00459
00460
00461
00462
00463
00464
00465
00466
00467 void VertexDataPage::
00468 make_compressed() {
00469 if (_ram_class == RC_compressed) {
00470
00471 mark_used_lru();
00472 return;
00473 }
00474
00475 if (_ram_class == RC_disk) {
00476 do_restore_from_disk();
00477 }
00478
00479 if (_ram_class == RC_resident) {
00480 nassertv(_size == _uncompressed_size);
00481
00482 #ifdef HAVE_ZLIB
00483 PStatTimer timer(_vdata_compress_pcollector);
00484
00485 DeflatePage *page = new DeflatePage;
00486 DeflatePage *head = page;
00487
00488 z_stream z_dest;
00489 #ifdef USE_MEMORY_NOWRAPPERS
00490 z_dest.zalloc = Z_NULL;
00491 z_dest.zfree = Z_NULL;
00492 #else
00493 z_dest.zalloc = (alloc_func)&do_zlib_alloc;
00494 z_dest.zfree = (free_func)&do_zlib_free;
00495 #endif
00496
00497 z_dest.opaque = Z_NULL;
00498 z_dest.msg = (char *) "no error message";
00499
00500 int result = deflateInit(&z_dest, vertex_data_compression_level);
00501 if (result < 0) {
00502 nassert_raise("zlib error");
00503 return;
00504 }
00505 Thread::consider_yield();
00506
00507 z_dest.next_in = (Bytef *)(char *)_page_data;
00508 z_dest.avail_in = _uncompressed_size;
00509 size_t output_size = 0;
00510
00511
00512
00513
00514
00515 int flush = 0;
00516 result = 0;
00517 while (result != Z_STREAM_END) {
00518 unsigned char *start_out = (page->_buffer + page->_used_size);
00519 z_dest.next_out = (Bytef *)start_out;
00520 z_dest.avail_out = (size_t)deflate_page_size - page->_used_size;
00521 if (z_dest.avail_out == 0) {
00522 DeflatePage *new_page = new DeflatePage;
00523 page->_next = new_page;
00524 page = new_page;
00525 start_out = page->_buffer;
00526 z_dest.next_out = (Bytef *)start_out;
00527 z_dest.avail_out = deflate_page_size;
00528 }
00529
00530 result = deflate(&z_dest, flush);
00531 if (result < 0 && result != Z_BUF_ERROR) {
00532 nassert_raise("zlib error");
00533 return;
00534 }
00535 size_t bytes_produced = (size_t)((unsigned char *)z_dest.next_out - start_out);
00536 page->_used_size += bytes_produced;
00537 nassertv(page->_used_size <= deflate_page_size);
00538 output_size += bytes_produced;
00539 if (bytes_produced == 0) {
00540
00541 flush = Z_FINISH;
00542 }
00543
00544 Thread::consider_yield();
00545 }
00546 nassertv(z_dest.avail_in == 0);
00547
00548 result = deflateEnd(&z_dest);
00549 nassertv(result == Z_OK);
00550
00551
00552
00553
00554 size_t new_allocated_size = round_up(output_size);
00555 unsigned char *new_data = alloc_page_data(new_allocated_size);
00556
00557 size_t copied_size = 0;
00558 unsigned char *p = new_data;
00559 page = head;
00560 while (page != NULL) {
00561 memcpy(p, page->_buffer, page->_used_size);
00562 copied_size += page->_used_size;
00563 p += page->_used_size;
00564 DeflatePage *next = page->_next;
00565 delete page;
00566 page = next;
00567 }
00568 nassertv(copied_size == output_size);
00569
00570
00571
00572 free_page_data(_page_data, _allocated_size);
00573 _page_data = new_data;
00574 _size = output_size;
00575 _allocated_size = new_allocated_size;
00576
00577 if (gobj_cat.is_debug()) {
00578 gobj_cat.debug()
00579 << "Compressed " << *this << " from " << _uncompressed_size
00580 << " to " << _size << "\n";
00581 }
00582 #endif
00583 set_lru_size(_size);
00584 set_ram_class(RC_compressed);
00585 }
00586 }
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596 void VertexDataPage::
00597 make_disk() {
00598 if (_ram_class == RC_disk) {
00599
00600 mark_used_lru();
00601 return;
00602 }
00603
00604 if (_ram_class == RC_resident || _ram_class == RC_compressed) {
00605 if (!do_save_to_disk()) {
00606
00607 gobj_cat.warning()
00608 << "Couldn't save page " << this << " to disk.\n";
00609 mark_used_lru();
00610 return;
00611 }
00612
00613 free_page_data(_page_data, _allocated_size);
00614 _page_data = NULL;
00615 _size = 0;
00616
00617 set_ram_class(RC_disk);
00618 }
00619 }
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632 bool VertexDataPage::
00633 do_save_to_disk() {
00634 if (_ram_class == RC_resident || _ram_class == RC_compressed) {
00635 PStatTimer timer(_vdata_save_pcollector);
00636
00637 if (_saved_block == (VertexDataSaveBlock *)NULL) {
00638 if (gobj_cat.is_debug()) {
00639 gobj_cat.debug()
00640 << "Storing page, " << _size << " bytes, to disk\n";
00641 }
00642
00643 bool compressed = (_ram_class == RC_compressed);
00644
00645 _saved_block = get_save_file()->write_data(_page_data, _allocated_size, compressed);
00646 if (_saved_block == (VertexDataSaveBlock *)NULL) {
00647
00648 return false;
00649 }
00650 } else {
00651 if (gobj_cat.is_debug()) {
00652 gobj_cat.debug()
00653 << "Page already stored: " << _size << " bytes\n";
00654 }
00655 }
00656 }
00657
00658 return true;
00659 }
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669
00670 void VertexDataPage::
00671 do_restore_from_disk() {
00672 if (_ram_class == RC_disk) {
00673 nassertv(_saved_block != (VertexDataSaveBlock *)NULL);
00674 nassertv(_page_data == (unsigned char *)NULL && _size == 0);
00675
00676 PStatTimer timer(_vdata_restore_pcollector);
00677
00678 size_t buffer_size = _saved_block->get_size();
00679 if (gobj_cat.is_debug()) {
00680 gobj_cat.debug()
00681 << "Restoring page, " << buffer_size << " bytes, from disk\n";
00682 }
00683
00684 size_t new_allocated_size = round_up(buffer_size);
00685 unsigned char *new_data = alloc_page_data(new_allocated_size);
00686 if (!get_save_file()->read_data(new_data, new_allocated_size, _saved_block)) {
00687 nassert_raise("read error");
00688 }
00689
00690 nassertv(_page_data == (unsigned char *)NULL);
00691 _page_data = new_data;
00692 _size = buffer_size;
00693 _allocated_size = new_allocated_size;
00694
00695 set_lru_size(_size);
00696 if (_saved_block->get_compressed()) {
00697 set_ram_class(RC_compressed);
00698 } else {
00699 set_ram_class(RC_resident);
00700 }
00701 }
00702 }
00703
00704
00705
00706
00707
00708
00709
00710
00711 void VertexDataPage::
00712 adjust_book_size() {
00713 size_t new_size = _contiguous;
00714 if (_ram_class != RC_resident) {
00715
00716
00717 new_size = 0;
00718 }
00719
00720 if (_book != (VertexDataBook *)NULL && new_size != _book_size) {
00721 VertexDataBook::Pages::iterator pi = _book->_pages.find(this);
00722 nassertv(pi != _book->_pages.end());
00723 _book->_pages.erase(pi);
00724
00725 _book_size = new_size;
00726 bool inserted = _book->_pages.insert(this).second;
00727 nassertv(inserted);
00728 }
00729 }
00730
00731
00732
00733
00734
00735
00736
00737
00738
00739
00740
00741 void VertexDataPage::
00742 request_ram_class(RamClass ram_class) {
00743 int num_threads = vertex_data_page_threads;
00744 if (num_threads == 0 || !Thread::is_threading_supported()) {
00745
00746 switch (ram_class) {
00747 case RC_resident:
00748 make_resident();
00749 break;
00750
00751 case RC_compressed:
00752 make_compressed();
00753 break;
00754
00755 case RC_disk:
00756 make_disk();
00757 break;
00758
00759 case RC_end_of_list:
00760 break;
00761 }
00762 _pending_ram_class = ram_class;
00763 return;
00764 }
00765
00766 MutexHolder holder(_tlock);
00767 if (_thread_mgr == (PageThreadManager *)NULL) {
00768
00769 gobj_cat.info()
00770 << "Spawning " << num_threads << " vertex paging threads.\n";
00771 _thread_mgr = new PageThreadManager(num_threads);
00772 }
00773
00774 _thread_mgr->add_page(this, ram_class);
00775 }
00776
00777
00778
00779
00780
00781
00782
00783
00784 void VertexDataPage::
00785 make_save_file() {
00786 size_t max_size = (size_t)max_disk_vertex_data;
00787
00788 _save_file = new VertexDataSaveFile(vertex_save_file_directory,
00789 vertex_save_file_prefix, max_size);
00790 }
00791
00792
00793
00794
00795
00796
00797
00798 unsigned char *VertexDataPage::
00799 alloc_page_data(size_t page_size) const {
00800 _alloc_pages_pcollector.add_level_now(page_size);
00801 return (unsigned char *)memory_hook->mmap_alloc(page_size, false);
00802 }
00803
00804
00805
00806
00807
00808
00809 void VertexDataPage::
00810 free_page_data(unsigned char *page_data, size_t page_size) const {
00811 _alloc_pages_pcollector.sub_level_now(page_size);
00812 memory_hook->mmap_free(page_data, page_size);
00813 }
00814
00815
00816
00817
00818
00819
00820 VertexDataPage::PageThreadManager::
00821 PageThreadManager(int num_threads) :
00822 _shutdown(false),
00823 _pending_cvar(_tlock)
00824 {
00825 start_threads(num_threads);
00826 }
00827
00828
00829
00830
00831
00832
00833
00834
00835
00836
00837 void VertexDataPage::PageThreadManager::
00838 add_page(VertexDataPage *page, RamClass ram_class) {
00839 nassertv(!_shutdown);
00840
00841 if (page->_pending_ram_class == ram_class) {
00842
00843 nassertv(page->get_lru() == &_pending_lru);
00844 return;
00845 }
00846
00847 if (page->_pending_ram_class != page->_ram_class) {
00848
00849
00850 remove_page(page);
00851 }
00852
00853 if (page->_pending_ram_class != ram_class) {
00854
00855
00856
00857 page->mark_used_lru(&_pending_lru);
00858
00859 page->_pending_ram_class = ram_class;
00860 if (ram_class == RC_resident) {
00861 _pending_reads.push_back(page);
00862 } else {
00863 _pending_writes.push_back(page);
00864 }
00865 _pending_cvar.notify();
00866 }
00867 }
00868
00869
00870
00871
00872
00873
00874
00875
00876
00877
00878 void VertexDataPage::PageThreadManager::
00879 remove_page(VertexDataPage *page) {
00880 nassertv(page != (VertexDataPage *)NULL);
00881
00882 PageThreads::iterator ti;
00883 for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
00884 PageThread *thread = (*ti);
00885 if (page == thread->_working_page) {
00886
00887
00888 page->_lock.release();
00889 while (page == thread->_working_page) {
00890 thread->_working_cvar.wait();
00891 }
00892 page->_lock.acquire();
00893 return;
00894 }
00895 }
00896
00897 if (page->_pending_ram_class == RC_resident) {
00898 PendingPages::iterator pi =
00899 find(_pending_reads.begin(), _pending_reads.end(), page);
00900 nassertv(pi != _pending_reads.end());
00901 _pending_reads.erase(pi);
00902 } else {
00903 PendingPages::iterator pi =
00904 find(_pending_writes.begin(), _pending_writes.end(), page);
00905 nassertv(pi != _pending_writes.end());
00906 _pending_writes.erase(pi);
00907 }
00908
00909 page->_pending_ram_class = page->_ram_class;
00910
00911
00912 page->mark_used_lru(_global_lru[page->_ram_class]);
00913 }
00914
00915
00916
00917
00918
00919
00920
00921 int VertexDataPage::PageThreadManager::
00922 get_num_threads() const {
00923 return (int)_threads.size();
00924 }
00925
00926
00927
00928
00929
00930
00931
00932 int VertexDataPage::PageThreadManager::
00933 get_num_pending_reads() const {
00934 return (int)_pending_reads.size();
00935 }
00936
00937
00938
00939
00940
00941
00942
00943 int VertexDataPage::PageThreadManager::
00944 get_num_pending_writes() const {
00945 return (int)_pending_writes.size();
00946 }
00947
00948
00949
00950
00951
00952
00953
00954 void VertexDataPage::PageThreadManager::
00955 start_threads(int num_threads) {
00956 _shutdown = false;
00957
00958 _threads.reserve(num_threads);
00959 for (int i = 0; i < num_threads; ++i) {
00960 ostringstream name_strm;
00961 name_strm << "VertexDataPage" << _threads.size();
00962 PT(PageThread) thread = new PageThread(this, name_strm.str());
00963 thread->start(TP_low, true);
00964 _threads.push_back(thread);
00965 }
00966 }
00967
00968
00969
00970
00971
00972
00973
00974
00975 void VertexDataPage::PageThreadManager::
00976 stop_threads() {
00977 PageThreads threads;
00978 {
00979 MutexHolder holder(_tlock);
00980 _shutdown = true;
00981 _pending_cvar.notify_all();
00982 threads.swap(_threads);
00983 }
00984
00985 PageThreads::iterator ti;
00986 for (ti = threads.begin(); ti != threads.end(); ++ti) {
00987 PageThread *thread = (*ti);
00988 thread->join();
00989 }
00990
00991 nassertv(_pending_reads.empty() && _pending_writes.empty());
00992 }
00993
00994
00995
00996
00997
00998
00999 VertexDataPage::PageThread::
01000 PageThread(PageThreadManager *manager, const string &name) :
01001 Thread(name, name),
01002 _manager(manager),
01003 _working_cvar(_tlock)
01004 {
01005 }
01006
01007
01008
01009
01010
01011
01012 void VertexDataPage::PageThread::
01013 thread_main() {
01014 _tlock.acquire();
01015
01016 while (true) {
01017 PStatClient::thread_tick(get_sync_name());
01018
01019 while (_manager->_pending_reads.empty() &&
01020 _manager->_pending_writes.empty()) {
01021 if (_manager->_shutdown) {
01022 _tlock.release();
01023 return;
01024 }
01025 PStatTimer timer(_thread_wait_pcollector);
01026 _manager->_pending_cvar.wait();
01027 }
01028
01029
01030 if (!_manager->_pending_reads.empty()) {
01031 _working_page = _manager->_pending_reads.front();
01032 _manager->_pending_reads.pop_front();
01033 } else {
01034 _working_page = _manager->_pending_writes.front();
01035 _manager->_pending_writes.pop_front();
01036 }
01037
01038 RamClass ram_class = _working_page->_pending_ram_class;
01039 _tlock.release();
01040
01041 {
01042 MutexHolder holder(_working_page->_lock);
01043 switch (ram_class) {
01044 case RC_resident:
01045 _working_page->make_resident();
01046 break;
01047
01048 case RC_compressed:
01049 _working_page->make_compressed();
01050 break;
01051
01052 case RC_disk:
01053 _working_page->make_disk();
01054 break;
01055
01056 case RC_end_of_list:
01057 break;
01058 }
01059 }
01060
01061 _tlock.acquire();
01062
01063 _working_page = NULL;
01064 _working_cvar.notify();
01065
01066 Thread::consider_yield();
01067 }
01068 }