/*========================================================================= Program: Visualization Toolkit Module: vtkCGNSFileSeriesReader.cxx Copyright (c) Kitware, Inc. All rights reserved. See Copyright.txt or http://www.paraview.org/HTML/Copyright.html for details. This software is distributed WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the above copyright notice for more information. =========================================================================*/ #include "vtkCGNSFileSeriesReader.h" #include "vtkCGNSReader.h" #include "vtkCommand.h" #include "vtkCommunicator.h" #include "vtkDataSet.h" #include "vtkFileSeriesHelper.h" #include "vtkInformation.h" #include "vtkInformationVector.h" #include "vtkMultiBlockDataSet.h" #include "vtkMultiPieceDataSet.h" #include "vtkMultiProcessController.h" #include "vtkMultiProcessStream.h" #include "vtkNew.h" #include "vtkObjectFactory.h" #include "vtkSmartPointer.h" #include "vtkStreamingDemandDrivenPipeline.h" #include "vtksys/RegularExpression.hxx" #include "vtksys/SystemTools.hxx" #include #include #include #include #include namespace { template class SCOPED_SET { T& Var; T Prev; public: SCOPED_SET(T& var, const T& val) : Var(var) , Prev(var) { this->Var = val; } ~SCOPED_SET() { this->Var = this->Prev; } private: SCOPED_SET(const SCOPED_SET&) = delete; void operator=(const SCOPED_SET&) = delete; }; } vtkStandardNewMacro(vtkCGNSFileSeriesReader); vtkCxxSetObjectMacro(vtkCGNSFileSeriesReader, Controller, vtkMultiProcessController); //---------------------------------------------------------------------------- vtkCGNSFileSeriesReader::vtkCGNSFileSeriesReader() : Reader(nullptr) , IgnoreReaderTime(false) , Controller(nullptr) , ReaderObserverId(0) , InProcessRequest(false) { this->SetNumberOfInputPorts(0); this->SetNumberOfOutputPorts(1); this->SetController(vtkMultiProcessController::GetGlobalController()); } //---------------------------------------------------------------------------- vtkCGNSFileSeriesReader::~vtkCGNSFileSeriesReader() { this->SetReader(nullptr); this->SetController(nullptr); } //---------------------------------------------------------------------------- int vtkCGNSFileSeriesReader::CanReadFile(const char* filename) { return this->Reader ? this->Reader->CanReadFile(filename) : 0; } //---------------------------------------------------------------------------- void vtkCGNSFileSeriesReader::PrintSelf(ostream& os, vtkIndent indent) { this->Superclass::PrintSelf(os, indent); os << indent << "Reader: " << this->Reader << endl; os << indent << "IgnoreReaderTime: " << this->IgnoreReaderTime << endl; } //---------------------------------------------------------------------------- void vtkCGNSFileSeriesReader::AddFileName(const char* fname) { this->FileSeriesHelper->AddFileName(fname); } //---------------------------------------------------------------------------- void vtkCGNSFileSeriesReader::RemoveAllFileNames() { this->FileSeriesHelper->RemoveAllFileNames(); } //---------------------------------------------------------------------------- const char* vtkCGNSFileSeriesReader::GetCurrentFileName() const { return this->Reader ? this->Reader->GetFileName() : nullptr; } //---------------------------------------------------------------------------- void vtkCGNSFileSeriesReader::SetReader(vtkCGNSReader* reader) { if (this->Reader == reader) { return; } if (this->Reader) { this->RemoveObserver(this->ReaderObserverId); } vtkSetObjectBodyMacro(Reader, vtkCGNSReader, reader); if (this->Reader) { this->ReaderObserverId = this->Reader->AddObserver( vtkCommand::ModifiedEvent, this, &vtkCGNSFileSeriesReader::OnReaderModifiedEvent); } } //---------------------------------------------------------------------------- void vtkCGNSFileSeriesReader::OnReaderModifiedEvent() { if (!this->InProcessRequest) { this->Modified(); } } //---------------------------------------------------------------------------- int vtkCGNSFileSeriesReader::ProcessRequest( vtkInformation* request, vtkInformationVector** inputVector, vtkInformationVector* outputVector) { if (!this->Reader) { vtkErrorMacro("`Reader` cannot be NULL."); return 0; } int requestFromPort = request->Has(vtkStreamingDemandDrivenPipeline::FROM_OUTPUT_PORT()) ? request->Get(vtkStreamingDemandDrivenPipeline::FROM_OUTPUT_PORT()) : 0; assert(requestFromPort < this->GetNumberOfOutputPorts()); vtkInformation* outInfo = outputVector->GetInformationObject(requestFromPort); assert(this->InProcessRequest == false); SCOPED_SET markInProgress(this->InProcessRequest, true); // Since we are dealing with potentially temporal or partitioned fileseries, a // single rank may have to read more than 1 file. Before processing any // pipeline pass, let's make sure we have built up the set of active files. if (!this->UpdateActiveFileSet(outInfo)) { return 0; } // Before we continue processing the request, let's decide what mode should // the internal vtkCGNSReader work i.e. should it handle parallel processing // by splitting blocks across ranks, or are we letting // vtkCGNSReaderFileSeriesReader split files among ranks. if (this->FileSeriesHelper->GetPartitionedFiles()) { this->Reader->SetController(nullptr); this->Reader->SetDistributeBlocks(false); } else { this->Reader->SetController(this->Controller); this->Reader->SetDistributeBlocks(true); } if (this->FileSeriesHelper->GetPartitionedFiles() && request->Has(vtkStreamingDemandDrivenPipeline::REQUEST_DATA())) { // For REQUEST_DATA(), we need to iterate over all files in the active // set. if (!this->RequestData(request, inputVector, outputVector)) { return 0; } } else { // For most pipeline passes, it's sufficient to choose the first file in the // active set, if any, and then pass the request to the internal reader. if (!this->ActiveFiles.empty()) { this->ChooseActiveFile(0); if (!this->Reader->ProcessRequest(request, inputVector, outputVector)) { return 0; } } } // restore time information. this->FileSeriesHelper->FillTimeInformation(outInfo); return 1; } namespace vtkCGNSFileSeriesReaderNS { static bool SetFileNameCallback(vtkAlgorithm* reader, const std::string& fname) { if (vtkCGNSReader* cgnsReader = vtkCGNSReader::SafeDownCast(reader)) { cgnsReader->SetFileName(fname.c_str()); return true; } return false; }; } //---------------------------------------------------------------------------- bool vtkCGNSFileSeriesReader::UpdateActiveFileSet(vtkInformation* outInfo) { // Pass ivars to vtkFileSeriesHelper. this->FileSeriesHelper->SetIgnoreReaderTime(this->IgnoreReaderTime); // We pass a new instance of the reader to vtkFileSeriesHelper to avoid // mucking with this->Reader to just gather the file's time meta-data. vtkSmartPointer reader = vtkSmartPointer::Take(this->Reader->NewInstance()); reader->SetController(nullptr); reader->SetDistributeBlocks(false); // Update vtkFileSeriesHelper. Make it process all the filenames provided and // collect useful metadata from it. This is a no-op if the vtkFileSeriesHelper // wasn't modified. if (!this->FileSeriesHelper->UpdateInformation( reader.Get(), vtkCGNSFileSeriesReaderNS::SetFileNameCallback)) { return false; } // For current time/local partition, we need to determine which files to // read. Let's determine that. this->ActiveFiles = this->FileSeriesHelper->GetActiveFiles(outInfo); // For temporal file series, the active set should only have 1 file. If more than 1 // file matches the timestep, it means that we may have invalid time information // in the file series. Warn about it. if (!this->FileSeriesHelper->GetPartitionedFiles() && this->ActiveFiles.size() > 1) { vtkWarningMacro( "The CGNS file series may have incorrect (or duplicate) " "time values for a temporal file series. You may want to turn on 'IgnoreReaderTime'."); } return true; } //---------------------------------------------------------------------------- void vtkCGNSFileSeriesReader::ChooseActiveFile(int index) { std::string fname = (index < static_cast(this->ActiveFiles.size())) ? this->ActiveFiles[index] : std::string(); if (this->Reader->GetFileName() == nullptr || this->Reader->GetFileName() != fname) { this->Reader->SetFileName(fname.c_str()); this->Reader->UpdateInformation(); } } //---------------------------------------------------------------------------- #if defined(_MSC_VER) // VS generates `warning C4503 : decorated name length exceeded, name was truncated` // warning for the `BasesMap` data structure defined below. Since it's a private // data structure, we ignore this warning. #pragma warning(disable : 4503) #endif namespace { /** * This helps me sync up the multiblock structure across ranks. * This is a little hard-coded to the output of CGNS reader. It may be worthwhile to * generalize this to a filter and then simply use that. */ struct ANode { std::map Children; std::vector> Datasets; ANode() = default; ~ANode() { for (auto iter = this->Children.begin(); iter != this->Children.end(); ++iter) { delete iter->second; } } void Add(vtkMultiBlockDataSet* mb) { vtksys::RegularExpression nameRe("^(.*)_proc-([0-9]+)$"); for (unsigned int cc = 0, max = mb->GetNumberOfBlocks(); cc < max; ++cc) { std::string name = mb->GetMetaData(cc)->Get(vtkCompositeDataSet::NAME()); if (nameRe.find(name)) { name = nameRe.match(1); } auto citer = this->Children.find(name); if (citer == this->Children.end()) { ANode* child = new ANode(); this->Children[name] = child; child->Add(mb->GetBlock(cc)); } else { citer->second->Add(mb->GetBlock(cc)); } } } vtkSmartPointer Get() const { if (this->Children.empty()) { if (this->Datasets.size() == 1) { return this->Datasets.front(); } else if (!this->Datasets.empty()) { vtkNew mp; mp->SetNumberOfPieces(static_cast(this->Datasets.size())); for (unsigned int cc = 0; cc < mp->GetNumberOfPieces(); ++cc) { mp->SetPiece(cc, this->Datasets[cc]); } return mp.Get(); } return nullptr; } else { vtkNew mb; mb->SetNumberOfBlocks(static_cast(this->Children.size())); unsigned int blockNo = 0; for (auto iter = this->Children.begin(); iter != this->Children.end(); ++iter, ++blockNo) { mb->SetBlock(blockNo, iter->second->Get()); mb->GetMetaData(blockNo)->Set(vtkCompositeDataSet::NAME(), iter->first.c_str()); } return mb.Get(); } } bool SyncMetadata(vtkMultiProcessController* controller) { // note: this is not optimized to deep trees. const unsigned int child_count = static_cast(this->Children.size()); unsigned int max_child_count; controller->AllReduce(&child_count, &max_child_count, 1, vtkCommunicator::MAX_OP); unsigned int ds_count = static_cast(this->Datasets.size()); unsigned int total_ds_count; controller->AllReduce(&ds_count, &total_ds_count, 1, vtkCommunicator::SUM_OP); assert(max_child_count == 0 || total_ds_count == 0); // one of two must be 0. if (max_child_count > 0) { std::set cnames; for (auto citer = this->Children.begin(); citer != this->Children.end(); ++citer) { cnames.insert(citer->first); } this->AllReduce(cnames, controller); for (auto cniter = cnames.begin(); cniter != cnames.end(); ++cniter) { const std::string& name = (*cniter); if (this->Children.find(name) == this->Children.end()) { this->Children[name] = new ANode(); } } // Sync all children. for (auto citer = this->Children.begin(); citer != this->Children.end(); ++citer) { citer->second->SyncMetadata(controller); } } else if (total_ds_count > 0) { this->Datasets.resize(total_ds_count); } return true; } private: void Add(vtkDataObject* dobj) { if (vtkMultiBlockDataSet* mb = vtkMultiBlockDataSet::SafeDownCast(dobj)) { this->Add(mb); } else { this->Datasets.emplace_back(vtkDataSet::SafeDownCast(dobj)); } } void AllReduce(std::set& names, vtkMultiProcessController* controller) { std::ostringstream str; for (auto iter = names.begin(); iter != names.end(); ++iter) { str << (*iter) << '\n'; } int slen = static_cast(str.str().size()) + 1; // includes room for null terminator. int maxslen = 0; controller->AllReduce(&slen, &maxslen, 1, vtkCommunicator::MAX_OP); int numRanks = controller->GetNumberOfProcesses(); std::vector buffer(numRanks * maxslen, '\0'); std::vector sbuffer(maxslen, '\0'); strcpy(&sbuffer[0], str.str().c_str()); controller->AllGather(&sbuffer[0], &buffer[0], maxslen); names.clear(); for (int cc = 0; cc < numRanks; ++cc) { std::vector rnames; vtksys::SystemTools::Split(std::string(&buffer[cc * maxslen]), rnames, '\n'); names.insert(rnames.begin(), rnames.end()); } } }; } //---------------------------------------------------------------------------- int vtkCGNSFileSeriesReader::RequestData( vtkInformation* request, vtkInformationVector** inputVector, vtkInformationVector* outputVector) { // iterate over all files in the active set and collect the data. ANode hierarchy; int success = 1; for (size_t cc = 0, max = this->ActiveFiles.size(); cc < max; ++cc) { this->ChooseActiveFile(static_cast(cc)); if (this->Reader->ProcessRequest(request, inputVector, outputVector)) { vtkMultiBlockDataSet* output = vtkMultiBlockDataSet::GetData(outputVector, 0); assert(output); hierarchy.Add(output); output->Initialize(); } else { vtkErrorMacro("Failed to read '" << this->GetCurrentFileName() << "'"); success = 0; break; } } if (this->Controller && this->Controller->GetNumberOfProcesses() > 1) { int allSuccess = 0; this->Controller->AllReduce(&success, &allSuccess, 1, vtkCommunicator::MIN_OP); if (allSuccess == 0) { return 0; } } if (this->Controller && this->Controller->GetNumberOfProcesses() > 1) { // Ensure all ranks have same meta-data about the number of bases and zones. hierarchy.SyncMetadata(this->Controller); } vtkMultiBlockDataSet* output = vtkMultiBlockDataSet::GetData(outputVector, 0); output->Initialize(); output->ShallowCopy(hierarchy.Get()); return 1; }