Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

Hadoop IO


May 26, 2021 Hadoop


Table of contents


Hadoop - IO

  1. The input file is read from HDFS.
  2. The output file is stored on the local disk.
  3. Network I/O between Reducer and Mapper, from the Mapper node to get Reducer's retrieval file.
  4. Use the Reducer instance to read back data from the local disk.
  5. Reducer output - back to HDFS.

Serialization

Serialization is the process of converting structured objects into byte streams for permanent storage on a network or on disk. Inverse serialization is the inverse process of transferring bytes back to a structured object.

Serialization is used in two major areas of distributed data processing: interprocess communication and permanent storage

In Hadoop, communication between multiple node processes in the system is achieved through Remote Procedure Call (RPC). T he RPC protocol serializes messages into binary streams and sends them to remote nodes, which then reseries binary streams into the original messages. Typically, the RPC serialization format is as follows:

1. Compact

Compact format takes full advantage of network bandwidth (the scarcest resource in the data center)

2. Fast

Inter-process communication forms the skeleton of a distributed system, so it is essential to minimize the performance overhead of serialization and reserication.

3. Expandable

Protocols are constantly changing to meet new needs. T herefore, in the process of controlling the client and service period, the corresponding protocol needs to be introduced directly. For example, you need to be able to add new parameters during method calls, and the new server needs to be able to accept old-format messages from older clients (no new parameters).

4. Support for interoperability

For systems, the desire to be able to support client-server interactions written in different languages requires designing a specific format to meet this requirement.


Writable interface

The Writable interface defines two methods: one to write its state to the DataOutput binary stream and the other to read the state from the DataInput binary stream.

BytesWritable

BytesWritable is the encapsulation of an array of binary data. I ts serialization format is an integer domain (4 bytes) that specifies the number of data bytes contained, followed by the data content itself. For example, an array of bytes with a length of 2 contains values 3 and 5, serialized as a 4-byte integer (00000002) and two bytes (03 and 05) in that array.

NullWritable

NullWritable is a special type of writable with a serialized length of 0. I t does not read data from the data stream, nor does it write data. It acts as a placeholder.

ObjectWritable and GenelicWritable

ObjectWritable is a generic encapsulation of java basic types (String, enum, Writable, null, or an array of these types). It is used in Hadoop RPC to encapsulate and depulate the parameters and return types of the method.

Wriable collection class

The io package has six Writable collection classes: ArrayWritable, ArrayPrimitiveWritable, TwoDArrayWritable, MapWritable, SortedMapWritable, and EnumMapWritable

ArrayWritable and TwoDArrayWritable are the implementations of an array of Writeble and an array of two-dimensional arrays (arrays of arrays). A ll elements in ArrayWritable or TwoDArrayWritable must be instances of the same class. A rrayWritable and TwoDArrayWritable both have get(), set() and toArray() methods, which are used to create a shallow copy of the array.

ArrayPrimitiveWritable is an encapsulation of the Java basic array type. When you call the set() method, you can identify the component type so that you don't have to inherit the class to set the type.

Serialized framework

Although most Mapreduce programs use keys and values of the Writable type, this is not mandatory for the MapReduce API. In fact, you can use any type, as long as there is a mechanism for converting each type back and forth between type and binary.

To support this mechanism, Hadoop has an API for a replaceable serialization framework. T he serialization framework is represented by a Serialization implementation. The Serialization object defines how the type is mapped from the serializer instance (converting the object to a byte stream) and the Deserializer instance (converting the byte stream to an object).

Serialized IDL

There are many other serialization frameworks that solve the problem from different perspectives: instead of defining types through code, interface-defined languages are used to declare them in a language-free manner. A s a result, the system can generate models for other languages, which can effectively improve interoperability. They also typically define version control scenarios.

Two of the more popular serialization frameworks, Apache Thrift and Google's Protocol Buffers, are often used as permanent storage formats for binary data. The Mapreduce format has limited support for this class, but within Hadoop, some components still use the two serialized frameworks above for RPC and data exchange.

File-based data structure

For some applications, we need a special data structure to store our own data. For Mapreduce-based data processing, it is not possible to achieve scalability by putting each large binary data object separately in its own file, so Hadoop has developed a number of higher-level containers for this purpose.

About SequenceFile.

Consider log files, where each line of text represents one logging. P lain text is not suitable for recording data of binary types. I n this case, Hadoop's SequenceFile class is a good fit to provide a persistent data structure for binary key pairs. When you use it as a storage format for log files, you can select the key yourself, and the value can be a Writable type.

SequenceFile can also be used as a container for small files. HDFS and Mapreduce are optimized for large files, so packaging small files with the SequenceFile type allows for more efficient storage and processing.

Writing by Sequnce File

The CreateWriter() static method allows you to create a SequenceFile object and return an instance of SequnceFile.Writer. T he static method has multiple overloaded versions, but requires the type of data flow to be written, configuration objects, and keys and values to be written. T he keys and values stored in SequenceFIle are not necessarily Writable types. Any type is available as long as it can be serialized and reserialized by Sertialization.

The reading operation of SequenceFile

Reading sequential files from start to finish is no more than repeatedly calling the next() method iteration to read records after creating a SequenceFile.reader instance. W hich record is read is related to the serialized framework used. I f you are using the Writable type, the next() method, which uses keys and values as arguments, can read the next key value of the data stream into the variable.

1. SequenceFile is displayed through the command line interface.

The hadoop fs command has a -text option that displays sequential files as text. T his option allows you to view the code of the file, thereby detecting the type of file and converting it to the appropriate text. This option recognizes gzip compressed files, sequential files, and Avro data files;

2. Sorting and merging of SequenceFile.

Mapreduce is the most efficient way to sort (or merge) multiple sequential files. M apreduce itself is parallel, and it's up to you to set how many reducers to use. For example, you can get an output file by developing a reducer.

3. The format of SequenceFile.

The sequential file consists of the file header and one or more subsequent records. T he first three bytes of the sequential file are SEQ, followed by one byte representing the version number of the sequential file. T he header also includes other fields, such as key and value names, data compression details, user-defined metadata, and synchronous identities. S ynchronous identities are used to identify record boundaries from anywhere when reading files. E ach file has a randomly generated synchronous identity whose values are stored in the file header, between the records in the sequential file and the records. The additional storage overhead requirement for the synchronization identity is less than 1%, so it is not necessary to add the identity at the end of each record.

About MapFile

MapFile is a sequenceFile that has been sequenced and is indexed so you can click to find it. T he index itself is a SequenceFile that contains a small number of keys in the map. B ecause the index can be loaded into memory, it provides a quick lookup of the master data file. The main data file is another SequenceFIle that contains all map entries, sorted in key order.

Other file formats and column-oriented formats

Sequential and map files are the earliest, but not the only, binary formats in Hadoop, and in fact, there are better binary formats to choose from for new projects.

Avro data files are similar in some ways to sequential files and are designed for large-scale data processing. B ut Avro data files are portable and can be used across different programming languages. S equential files, map files, and Avro data files are all row-oriented formats, meaning that the values of each row are stored continuously in the file. In a column-oriented format, rows in a file are sliced into rows, and each fragment is stored as a column-oriented: first the value of the first column of each row is stored, and then the value of the second column of each row, as it has been in the past.

Compression

Reduce disk footprint and network transfer, and accelerate data transfer over networks and disks.


Hadoop applications handle very large data sets and therefore require compression. W hich compression format to use is related to the size, format, and tools of the file to be processed. Compare the compression ratios and performance of various compression algorithms (from high to low):

1. Use container file formats, such as sequential files, Avro data files. ORCF said parquet file

2. Use a compression format that supports singting, such as bzip2, or a compression format that enables sequestring through an index, such as LZO.

3. Cut the file into chunks in the app and use either format to create a compressed file for each block of data, whether or not it supports singting. In this case, you need to choose the data size reasonably to ensure that the compressed block is approximately the size of the HDFS block.

4. Store uncompressed files.


Focus: Compression and splitting are generally conflicting (the block of a compressed file is not well split to run independently, and many times a file split is split into two compressed files, at which point the Map task cannot be processed, so Hadoop often uses one Map task directly to process the analysis of the entire file for these compressions). Compression formats that do not support secing entire files are not available for large files, and the nature of the data is lost, resulting in inefficient Mapreduce applications.

Map output can also be compressed, which reduces the amount of data that Map results transfer to Reduce and speeds up the transfer rate.


Use compression in Mapreduce

FileOutputFormat.setCompressOutput(job,true);
FileOutputFormat.setOutputCompressorClass(job,GzipCodec.class);

If the output generates sequential files, you can set the mapreduce.output.fileoutputformat.compress.type property to control restrictions on the use of compression formats. T he default is RECORD, which compresses for each record. If you change it to BLOCK, it will be compressed against a set of records, which is the recommended compression strategy because it is more efficient to compress.

Integrity

  • A common measure to detect data corruption is to calculate the checksum when the data is first introduced into the system and again when the data is transmitted through an unreliable channel, so that we can find out if the data is corrupted, and if the calculated new checksum does not match the original checksum, we consider the data to be corrupted. B ut the technology doesn't fix the data. A common error detection code is CRC-32 (32-bit circular redundancy test), and data input of any size calculates a 32-bit integer check.
  • Datanode is responsible for validating the data before it is stored after it is received and its checksum. I t does this when it receives data from the client or copies data from other datanode. T he client writing the data sends the data and its checksum to a pipeline consisting of a series of datanodes, the last of which is responsible for verifying the checksum. If datanode detects an error, the client receives a sub-class of IOException exceptions.
  • When a client reads data from datanode, the checksum is also validated and compared to the checksum stored in datanode. E ach dataanode persists with a validated checksum log, so it knows the last validation time for each block of data. A fter the client successfully validates a block of data, it tells the datanode, which updates the log. Saving these statistics is valuable for detecting damaged disks.
  • Not only does the client validate the checksum when reading the data block, but each datanode also runs a DataBlockScanner in a background thread, periodically validating all blocks of data stored on the datanode. This measure is a powerful measure to solve the damage to the physical storage media.
  • Because HDFS stores copies of each block of data, it can repair corrupted blocks with data copies to get a new, intact copy. T he basic idea is that when a client reads a block, if an error is detected, first reports the corrupted block to namenode and the datanode it is trying to read, and then throws the ChecksumException exception. N amenode marks this block replica as corrupted so that it no longer sends client processing requests directly to this node, or attempts to copy the replica to another datanode. I t then arranges for one copy of the block to be copied to another datanode, so that the replica factor of the block returns to the desired level. S ince then, copies of corrupted blocks of data have been deleted.
  • Hadoop's LocalFileSystem performs client checksum validation. T his means that when you write to a filename file, the file system client explicitly creates a new filename.crc hidden file in the same directory that contains the quick checksum for each file. T he size of the file block is stored as metadata in the .crc file, so even if the block size settings have changed, the file can still be read back correctly. T he checksum needs to be verified when reading the file, and localFileSystem throws a ChecksumException exception if an error is detected.