/*========================================================================= Program: Visualization Toolkit Module: vtkMultiProcessStream.cxx Copyright (c) Ken Martin, Will Schroeder, Bill Lorensen All rights reserved. See Copyright.txt or http://www.kitware.com/Copyright.htm for details. This software is distributed WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the above copyright notice for more information. =========================================================================*/ #include "vtkMultiProcessStream.h" #include "vtkEndian.h" #include "vtkObjectFactory.h" #include "vtkSocketCommunicator.h" // for vtkSwap8 and vtkSwap4 macros. #include #include class vtkMultiProcessStream::vtkInternals { public: typedef std::deque DataType; DataType Data; enum Types { int32_value, uint32_value, char_value, uchar_value, double_value, float_value, string_value, int64_value, uint64_value, stream_value }; void Push(const unsigned char* data, size_t length) { for (size_t cc = 0; cc < length; cc++) { this->Data.push_back(data[cc]); } } void Pop(unsigned char* data, size_t length) { for (size_t cc = 0; cc < length; cc++) { data[cc] = this->Data.front(); this->Data.pop_front(); } } void SwapBytes() { DataType::iterator iter = this->Data.begin(); while (iter != this->Data.end()) { unsigned char type = *iter; int wordSize = 1; ++iter; switch (type) { case int32_value: case uint32_value: wordSize = sizeof(int); break; case float_value: wordSize = sizeof(float); break; case double_value: wordSize = sizeof(double); break; case char_value: case uchar_value: wordSize = sizeof(char); break; case stream_value: wordSize = sizeof(int); break; case string_value: // We want to bitswap the string size which is an int wordSize = sizeof(int); break; } switch (wordSize) { case 1: break; case 4: vtkSwap4(&(*iter)); break; case 8: vtkSwap8(&(*iter)); break; } // In case of string we don't need to swap char values int nbSkip = 0; if (type == string_value || type == stream_value) { nbSkip = *reinterpret_cast(&*iter); } while (wordSize > 0) { ++iter; wordSize--; } // Skip String chars for (int cc = 0; cc < nbSkip; cc++) { ++iter; } } } }; //------------------------------------------------------------------------------ vtkMultiProcessStream::vtkMultiProcessStream() { this->Internals = new vtkMultiProcessStream::vtkInternals(); #ifdef VTK_WORDS_BIGENDIAN this->Endianness = vtkMultiProcessStream::BigEndian; #else this->Endianness = vtkMultiProcessStream::LittleEndian; #endif } //------------------------------------------------------------------------------ vtkMultiProcessStream::~vtkMultiProcessStream() { delete this->Internals; this->Internals = nullptr; } //------------------------------------------------------------------------------ vtkMultiProcessStream::vtkMultiProcessStream(const vtkMultiProcessStream& other) { this->Internals = new vtkMultiProcessStream::vtkInternals(); this->Internals->Data = other.Internals->Data; this->Endianness = other.Endianness; } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator=(const vtkMultiProcessStream& other) { if (this == &other) { return *this; } this->Internals->Data = other.Internals->Data; this->Endianness = other.Endianness; return (*this); } //------------------------------------------------------------------------------ void vtkMultiProcessStream::Reset() { this->Internals->Data.clear(); } //------------------------------------------------------------------------------ int vtkMultiProcessStream::Size() { return (static_cast(this->Internals->Data.size())); } //------------------------------------------------------------------------------ bool vtkMultiProcessStream::Empty() { return (this->Internals->Data.empty()); } //------------------------------------------------------------------------------ void vtkMultiProcessStream::Push(double array[], unsigned int size) { assert("pre: array is nullptr!" && (array != nullptr)); this->Internals->Data.push_back(vtkInternals::double_value); this->Internals->Push(reinterpret_cast(&size), sizeof(unsigned int)); this->Internals->Push(reinterpret_cast(array), sizeof(double) * size); } //------------------------------------------------------------------------------ void vtkMultiProcessStream::Push(float array[], unsigned int size) { assert("pre: array is nullptr!" && (array != nullptr)); this->Internals->Data.push_back(vtkInternals::float_value); this->Internals->Push(reinterpret_cast(&size), sizeof(unsigned int)); this->Internals->Push(reinterpret_cast(array), sizeof(float) * size); } //------------------------------------------------------------------------------ void vtkMultiProcessStream::Push(int array[], unsigned int size) { assert("pre: array is nullptr!" && (array != nullptr)); this->Internals->Data.push_back(vtkInternals::int32_value); this->Internals->Push(reinterpret_cast(&size), sizeof(unsigned int)); this->Internals->Push(reinterpret_cast(array), sizeof(int) * size); } //------------------------------------------------------------------------------ void vtkMultiProcessStream::Push(char array[], unsigned int size) { assert("pre: array is nullptr!" && (array != nullptr)); this->Internals->Data.push_back(vtkInternals::char_value); this->Internals->Push(reinterpret_cast(&size), sizeof(unsigned int)); this->Internals->Push(reinterpret_cast(array), sizeof(char) * size); } //------------------------------------------------------------------------------ void vtkMultiProcessStream::Push(unsigned int array[], unsigned int size) { assert("pre: array is nullptr!" && (array != nullptr)); this->Internals->Data.push_back(vtkInternals::uint32_value); this->Internals->Push(reinterpret_cast(&size), sizeof(unsigned int)); this->Internals->Push(reinterpret_cast(array), sizeof(unsigned int) * size); } //------------------------------------------------------------------------------ void vtkMultiProcessStream::Push(unsigned char array[], unsigned int size) { assert("pre: array is nullptr!" && (array != nullptr)); this->Internals->Data.push_back(vtkInternals::uchar_value); this->Internals->Push(reinterpret_cast(&size), sizeof(unsigned int)); this->Internals->Push(array, size); } //------------------------------------------------------------------------------ void vtkMultiProcessStream::Push(vtkTypeInt64 array[], unsigned int size) { assert("pre: array is nullptr!" && (array != nullptr)); this->Internals->Data.push_back(vtkInternals::int64_value); this->Internals->Push(reinterpret_cast(&size), sizeof(unsigned int)); this->Internals->Push(reinterpret_cast(array), sizeof(vtkTypeUInt64) * size); } //------------------------------------------------------------------------------ void vtkMultiProcessStream::Push(vtkTypeUInt64 array[], unsigned int size) { assert("pre: array is nullptr!" && (array != nullptr)); this->Internals->Data.push_back(vtkInternals::uint64_value); this->Internals->Push(reinterpret_cast(&size), sizeof(unsigned int)); this->Internals->Push(reinterpret_cast(array), sizeof(vtkTypeUInt64) * size); } //------------------------------------------------------------------------------ void vtkMultiProcessStream::Pop(double*& array, unsigned int& size) { assert("pre: stream data must be double" && this->Internals->Data.front() == vtkInternals::double_value); this->Internals->Data.pop_front(); if (array == nullptr) { // Get the size of the array this->Internals->Pop(reinterpret_cast(&size), sizeof(unsigned int)); // Allocate array array = new double[size]; assert("ERROR: cannot allocate array" && (array != nullptr)); } else { unsigned int sz; // Get the size of the array this->Internals->Pop(reinterpret_cast(&sz), sizeof(unsigned int)); assert("ERROR: input array size does not match size of data" && (sz == size)); } // Pop the array data this->Internals->Pop(reinterpret_cast(array), sizeof(double) * size); } //------------------------------------------------------------------------------ void vtkMultiProcessStream::Pop(float*& array, unsigned int& size) { assert( "pre: stream data must be float" && this->Internals->Data.front() == vtkInternals::float_value); this->Internals->Data.pop_front(); if (array == nullptr) { // Get the size of the array this->Internals->Pop(reinterpret_cast(&size), sizeof(unsigned int)); // Allocate array array = new float[size]; assert("ERROR: cannot allocate array" && (array != nullptr)); } else { unsigned int sz; // Get the size of the array this->Internals->Pop(reinterpret_cast(&sz), sizeof(unsigned int)); assert("ERROR: input array size does not match size of data" && (sz == size)); } // Pop the array data this->Internals->Pop(reinterpret_cast(array), sizeof(float) * size); } //------------------------------------------------------------------------------ void vtkMultiProcessStream::Pop(int*& array, unsigned int& size) { assert( "pre: stream data must be int" && this->Internals->Data.front() == vtkInternals::int32_value); this->Internals->Data.pop_front(); if (array == nullptr) { // Get the size of the array this->Internals->Pop(reinterpret_cast(&size), sizeof(unsigned int)); // Allocate the array array = new int[size]; assert("ERROR: cannot allocate array" && (array != nullptr)); } else { unsigned int sz; // Get the size of the array this->Internals->Pop(reinterpret_cast(&sz), sizeof(unsigned int)); assert("ERROR: input array size does not match size of data" && (sz == size)); } // Pop the array data this->Internals->Pop(reinterpret_cast(array), sizeof(int) * size); } //------------------------------------------------------------------------------ void vtkMultiProcessStream::Pop(char*& array, unsigned int& size) { assert("pre: stream data must be of type char" && this->Internals->Data.front() == vtkInternals::char_value); this->Internals->Data.pop_front(); if (array == nullptr) { // Get the size of the array this->Internals->Pop(reinterpret_cast(&size), sizeof(unsigned int)); // Allocate the array array = new char[size]; assert("ERROR: cannot allocate array" && (array != nullptr)); } else { unsigned int sz; // Get the size of the array this->Internals->Pop(reinterpret_cast(&sz), sizeof(unsigned int)); assert("ERROR: input array size does not match size of data" && (sz == size)); } // Pop the array data this->Internals->Pop(reinterpret_cast(array), sizeof(char) * size); } //------------------------------------------------------------------------------ void vtkMultiProcessStream::Pop(unsigned int*& array, unsigned int& size) { assert("pre: stream data must be of type unsigned int" && this->Internals->Data.front() == vtkInternals::uint32_value); this->Internals->Data.pop_front(); if (array == nullptr) { // Get the size of the array this->Internals->Pop(reinterpret_cast(&size), sizeof(unsigned int)); // Allocate the array array = new unsigned int[size]; assert("ERROR: cannot allocate array" && (array != nullptr)); } else { unsigned int sz; // Get the size of the array this->Internals->Pop(reinterpret_cast(&sz), sizeof(unsigned int)); assert("ERROR: input array size does not match size of data" && (sz == size)); } // Pop the array data this->Internals->Pop(reinterpret_cast(array), sizeof(unsigned int) * size); } //------------------------------------------------------------------------------ void vtkMultiProcessStream::Pop(unsigned char*& array, unsigned int& size) { assert("pre: stream data must be of type unsigned char" && this->Internals->Data.front() == vtkInternals::uchar_value); this->Internals->Data.pop_front(); if (array == nullptr) { // Get the size of the array this->Internals->Pop(reinterpret_cast(&size), sizeof(unsigned int)); // Allocate the array array = new unsigned char[size]; assert("ERROR: cannot allocate array" && (array != nullptr)); } else { unsigned int sz; // Get the size of the array this->Internals->Pop(reinterpret_cast(&sz), sizeof(unsigned int)); assert("ERROR: input array size does not match size of data" && (sz == size)); } // Pop the array data this->Internals->Pop(array, size); } //------------------------------------------------------------------------------ void vtkMultiProcessStream::Pop(vtkTypeInt64*& array, unsigned int& size) { assert("pre: stream data must be of type vtkTypeInt64" && this->Internals->Data.front() == vtkInternals::int64_value); this->Internals->Data.pop_front(); if (array == nullptr) { // Get the size of the array this->Internals->Pop(reinterpret_cast(&size), sizeof(unsigned int)); // Allocate the array array = new vtkTypeInt64[size]; assert("ERROR: cannot allocate array" && (array != nullptr)); } else { unsigned int sz; // Get the size of the array this->Internals->Pop(reinterpret_cast(&sz), sizeof(unsigned int)); assert("ERROR: input array size does not match size of data" && (sz == size)); } // Pop the array data this->Internals->Pop(reinterpret_cast(array), sizeof(vtkTypeInt64) * size); } //------------------------------------------------------------------------------ void vtkMultiProcessStream::Pop(vtkTypeUInt64*& array, unsigned int& size) { assert("pre: stream data must be of type vtkTypeUInt64" && this->Internals->Data.front() == vtkInternals::uint64_value); this->Internals->Data.pop_front(); if (array == nullptr) { // Get the size of the array this->Internals->Pop(reinterpret_cast(&size), sizeof(unsigned int)); // Allocate the array array = new vtkTypeUInt64[size]; assert("ERROR: cannot allocate array" && (array != nullptr)); } else { unsigned int sz; // Get the size of the array this->Internals->Pop(reinterpret_cast(&sz), sizeof(unsigned int)); assert("ERROR: input array size does not match size of data" && (sz == size)); } // Pop the array data this->Internals->Pop(reinterpret_cast(array), sizeof(vtkTypeUInt64) * size); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator<<(double value) { this->Internals->Data.push_back(vtkInternals::double_value); this->Internals->Push(reinterpret_cast(&value), sizeof(double)); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator<<(float value) { this->Internals->Data.push_back(vtkInternals::float_value); this->Internals->Push(reinterpret_cast(&value), sizeof(float)); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator<<(int value) { this->Internals->Data.push_back(vtkInternals::int32_value); this->Internals->Push(reinterpret_cast(&value), sizeof(int)); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator<<(char value) { this->Internals->Data.push_back(vtkInternals::char_value); this->Internals->Push(reinterpret_cast(&value), sizeof(char)); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator<<(bool v) { char value = v; this->Internals->Data.push_back(vtkInternals::char_value); this->Internals->Push(reinterpret_cast(&value), sizeof(char)); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator<<(unsigned int value) { this->Internals->Data.push_back(vtkInternals::uint32_value); this->Internals->Push(reinterpret_cast(&value), sizeof(unsigned int)); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator<<(unsigned char value) { this->Internals->Data.push_back(vtkInternals::uchar_value); this->Internals->Push(&value, sizeof(unsigned char)); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator<<(vtkTypeInt64 value) { this->Internals->Data.push_back(vtkInternals::int64_value); this->Internals->Push(reinterpret_cast(&value), sizeof(vtkTypeInt64)); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator<<(vtkTypeUInt64 value) { this->Internals->Data.push_back(vtkInternals::uint64_value); this->Internals->Push(reinterpret_cast(&value), sizeof(vtkTypeUInt64)); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator<<(const char* value) { this->operator<<(std::string(value)); return *this; } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator<<(const std::string& value) { // Find the real string size int size = static_cast(value.size()); // Set the type this->Internals->Data.push_back(vtkInternals::string_value); // Set the string size this->Internals->Push(reinterpret_cast(&size), sizeof(int)); // Set the string content for (int idx = 0; idx < size; idx++) { this->Internals->Push(reinterpret_cast(&value[idx]), sizeof(char)); } return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator<<(const vtkMultiProcessStream& value) { unsigned int size = static_cast(value.Internals->Data.size()); size += 1; this->Internals->Data.push_back(vtkInternals::stream_value); this->Internals->Push(reinterpret_cast(&size), sizeof(unsigned int)); this->Internals->Data.push_back(value.Endianness); this->Internals->Data.insert( this->Internals->Data.end(), value.Internals->Data.begin(), value.Internals->Data.end()); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator>>(vtkMultiProcessStream& value) { assert(this->Internals->Data.front() == vtkInternals::stream_value); this->Internals->Data.pop_front(); unsigned int size; this->Internals->Pop(reinterpret_cast(&size), sizeof(unsigned int)); assert(size >= 1); this->Internals->Pop(&value.Endianness, 1); size--; value.Internals->Data.resize(size); this->Internals->Pop(&value.Internals->Data[0], size); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator>>(double& value) { assert(this->Internals->Data.front() == vtkInternals::double_value); this->Internals->Data.pop_front(); this->Internals->Pop(reinterpret_cast(&value), sizeof(double)); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator>>(float& value) { assert(this->Internals->Data.front() == vtkInternals::float_value); this->Internals->Data.pop_front(); this->Internals->Pop(reinterpret_cast(&value), sizeof(float)); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator>>(int& value) { // Automatically convert 64 bit values in case we are trying to transfer // vtkIdType with processes compiled with 32/64 values. if (this->Internals->Data.front() == vtkInternals::int64_value) { vtkTypeInt64 value64; (*this) >> value64; value = static_cast(value64); return (*this); } assert(this->Internals->Data.front() == vtkInternals::int32_value); this->Internals->Data.pop_front(); this->Internals->Pop(reinterpret_cast(&value), sizeof(int)); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator>>(char& value) { assert(this->Internals->Data.front() == vtkInternals::char_value); this->Internals->Data.pop_front(); this->Internals->Pop(reinterpret_cast(&value), sizeof(char)); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator>>(bool& v) { char value; assert(this->Internals->Data.front() == vtkInternals::char_value); this->Internals->Data.pop_front(); this->Internals->Pop(reinterpret_cast(&value), sizeof(char)); v = (value != 0); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator>>(unsigned int& value) { assert(this->Internals->Data.front() == vtkInternals::uint32_value); this->Internals->Data.pop_front(); this->Internals->Pop(reinterpret_cast(&value), sizeof(unsigned int)); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator>>(unsigned char& value) { assert(this->Internals->Data.front() == vtkInternals::uchar_value); this->Internals->Data.pop_front(); this->Internals->Pop(reinterpret_cast(&value), sizeof(unsigned char)); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator>>(vtkTypeInt64& value) { // Automatically convert 64 bit values in case we are trying to transfer // vtkIdType with processes compiled with 32/64 values. if (this->Internals->Data.front() == vtkInternals::int32_value) { int value32; (*this) >> value32; value = value32; return (*this); } assert(this->Internals->Data.front() == vtkInternals::int64_value); this->Internals->Data.pop_front(); this->Internals->Pop(reinterpret_cast(&value), sizeof(vtkTypeInt64)); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator>>(vtkTypeUInt64& value) { assert(this->Internals->Data.front() == vtkInternals::uint64_value); this->Internals->Data.pop_front(); this->Internals->Pop(reinterpret_cast(&value), sizeof(vtkTypeUInt64)); return (*this); } //------------------------------------------------------------------------------ vtkMultiProcessStream& vtkMultiProcessStream::operator>>(std::string& value) { value = ""; assert(this->Internals->Data.front() == vtkInternals::string_value); this->Internals->Data.pop_front(); int stringSize; this->Internals->Pop(reinterpret_cast(&stringSize), sizeof(int)); char c_value; for (int idx = 0; idx < stringSize; idx++) { this->Internals->Pop(reinterpret_cast(&c_value), sizeof(char)); value += c_value; } return (*this); } //------------------------------------------------------------------------------ std::vector vtkMultiProcessStream::GetRawData() const { std::vector data; this->GetRawData(data); return data; } //------------------------------------------------------------------------------ void vtkMultiProcessStream::GetRawData(std::vector& data) const { data.clear(); data.push_back(this->Endianness); data.resize(1 + this->Internals->Data.size()); vtkInternals::DataType::iterator iter; int cc = 1; for (iter = this->Internals->Data.begin(); iter != this->Internals->Data.end(); ++iter, ++cc) { data[cc] = (*iter); } } //------------------------------------------------------------------------------ void vtkMultiProcessStream::SetRawData(const std::vector& data) { this->Internals->Data.clear(); unsigned char endianness = data.front(); std::vector::const_iterator iter = data.begin(); ++iter; this->Internals->Data.resize(data.size() - 1); int cc = 0; for (; iter != data.end(); ++iter, ++cc) { this->Internals->Data[cc] = *iter; } if (this->Endianness != endianness) { this->Internals->SwapBytes(); } } //------------------------------------------------------------------------------ void vtkMultiProcessStream::GetRawData(unsigned char*& data, unsigned int& size) { delete[] data; size = this->Size() + 1; data = new unsigned char[size + 1]; assert("pre: cannot allocate raw data buffer" && (data != nullptr)); data[0] = this->Endianness; vtkInternals::DataType::iterator iter = this->Internals->Data.begin(); for (int idx = 1; iter != this->Internals->Data.end(); ++iter, ++idx) { data[idx] = *iter; } } //------------------------------------------------------------------------------ void vtkMultiProcessStream::SetRawData(const unsigned char* data, unsigned int size) { this->Internals->Data.clear(); if (size > 0) { unsigned char endianness = data[0]; this->Internals->Data.resize(size - 1); int cc = 0; for (; cc < static_cast(size - 1); cc++) { this->Internals->Data[cc] = data[cc + 1]; } if (this->Endianness != endianness) { this->Internals->SwapBytes(); } } }