Hadoop LZO 압축 [회전]

12228 단어 hadoop알고리즘
hadop 에서 lzo 의 압축 알고리즘 을 사용 하면 데이터 의 크기 와 데이터 의 디스크 읽 기와 쓰기 시간 을 줄 일 수 있 습 니 다. 뿐만 아니 라 lzo 는 block 블록 을 기반 으로 합 니 다. 그러면 그 는 데이터 가 chunk 로 분해 되 고 hadop 에 의 해 병행 되 는 것 을 허용 합 니 다.이러한 특징 은 lzo 를 hadop 에서 매우 좋 은 압축 형식 으로 만 들 수 있다.   lzo 자 체 는 splitable 이 아니 기 때문에 데이터 가 text 형식 일 때 lzo 로 압축 된 데 이 터 를 job 로 입력 하 는 것 은 하나의 파일 로 map 입 니 다.그러나 sequence file 자 체 는 블록 으로 나 뉘 어 있 기 때문에 sequence file 형식의 파일 에 lzo 의 압축 형식 을 더 하면 lzo 파일 방식 의 splitable 을 실현 할 수 있 습 니 다.   압축 된 데 이 터 는 보통 원본 데이터 의 1 / 4 에 불과 하기 때문에 HDFS 에 압축 데 이 터 를 저장 하면 클 러 스 터 가 더 많은 데 이 터 를 저장 하고 클 러 스 터 의 사용 수명 을 연장 할 수 있다.뿐만 아니 라 Mapreduce 작업 은 보통 병목 이 IO 에 있 기 때문에 압축 데 이 터 를 저장 하 는 것 은 더 적은 IO 작업 을 의미 하고 job 운행 이 더욱 효율 적 이라는 것 을 의미한다.그러나 hadop 에서 압축 을 사용 하 는 것 도 두 가지 번 거 로 운 점 이 있 습 니 다. 첫째, 일부 압축 형식 은 블록 으로 나 눌 수 없고 병행 적 으로 처리 할 수 없습니다. 예 를 들 어 gzip.둘째, 다른 압축 형식 은 블록 처 리 를 지원 하지만 압축 을 푸 는 과정 이 매우 느 려 서 job 의 병목 을 cpu 로 옮 겼 습 니 다. 예 를 들 어 bzip 2.예 를 들 어 우 리 는 1.1GB 의 gzip 파일 이 있 는데 이 파일 은 128 MB / chunk 로 나 뉘 어 hdfs 에 저장 되면 9 조각 으로 나 눌 것 이다.mapreduce 에서 각 chunk 를 병행 처리 할 수 있 도록 각 mapper 간 에 의존 합 니 다.두 번 째 mapper 는 파일 의 무 작위 byte 에서 처 리 됩 니 다.그러면 gzip 압축 을 풀 때 사용 할 컨 텍스트 사전 이 비어 있 습 니 다. 이것 은 gzip 의 압축 파일 이 hadop 에서 정확 한 병행 처 리 를 할 수 없다 는 것 을 의미 합 니 다.따라서 hadop 에서 큰 gzip 압축 파일 은 하나의 mapper 에 의 해 하나의 처리 만 할 수 있 습 니 다. 그러면 효율 적 이지 않 고 Maprediuce 를 사용 하지 않 는 것 과 다 를 바 없습니다.또 다른 bzip 2 압축 형식 은 bzip 2 의 압축 이 매우 빠 르 고 심지어 블록 으로 나 눌 수 있 지만 압축 을 푸 는 과정 이 매우 느 리 고 streaming 으로 읽 을 수 없 으 며 hadop 에서 이런 압축 을 효율적으로 사용 할 수 없다.사용 하 더 라 도 스트레스 해소 의 비효 율 로 인해 job 의 병목 이 cpu 로 옮 겨 갈 수 있 습 니 다.만약 압축 알고리즘 을 가지 고 있다 면 블록 을 나 누고 병행 할 수 있 으 며 속도 도 매우 빠르다 면 매우 이상 적 이다.이런 방식 이 바로 lzo 다.lzo 의 압축 파일 은 많은 작은 blocks 로 구성 되 어 있 으 며, hadop 의 job 는 block 의 구분 에 따라 split job 를 할 수 있 습 니 다.뿐만 아니 라 lzo 는 디자인 할 때 효율 문 제 를 고려 했다. 그의 압축 해제 속 도 는 gzip 의 두 배 이기 때문에 많은 디스크 읽 기와 쓰 기 를 절약 할 수 있다. 그의 압축 비 는 gzip 보다 못 하고 압축 된 파일 은 gzip 보다 절반 정도 크다. 그러나 이렇게 하면 압축 되 지 않 은 파일 보다 20% - 50% 의 저장 공간 을 절약 할 수 있다.이렇게 하면 효율 적 으로 job 집행 속 도 를 크게 높 일 수 있다.다음은 압축 대비 데이터 입 니 다. 압축 되 지 않 은 8.0GB 의 데 이 터 를 사용 하여 비교 합 니 다. 압축 형식 파일 크기 (GB) 압축 시간 압축 해제 시간 None somelogs 8.0 - - Gzip some_logs.gz 1.3 241 72 LZO some_logs. lzo 2.0 55 35 를 통 해 알 수 있 듯 이 lzo 압축 파일 은 gzip 압축 파일 보다 약간 크 지만 원본 파일 보다 훨씬 작 습 니 다. 또한 lzo 파일 의 압축 속 도 는 gzip 의 5 배 에 해당 하고 압축 을 푸 는 속 도 는 gzip 의 두 배 에 해당 합 니 다.lzo 파일 은 block boundaries 에 따라 블록 을 나 눌 수 있 습 니 다. 예 를 들 어 1.1G 의 lzo 압축 파일 입 니 다. 그러면 두 번 째 128 MB block 의 mapper 를 처리 하려 면 다음 block 의 boundary 를 확인 하여 압축 해제 작업 을 해 야 합 니 다.lzo 는 데이터 헤 더 를 쓰 지 않 고 lzo index 파일 을 실현 하여 이 파일 (foo. lzo. index) 을 모든 foo. lzo 파일 에 썼 습 니 다.이 index 파일 은 모든 block 이 데이터 에 있 는 offset 만 간단하게 포함 되 어 있 습 니 다. 이 는 offset 이 알 고 있 기 때문에 데이터 에 대한 읽 기와 쓰기 가 매우 빨 라 집 니 다.보통 90 - 100 MB / 초, 즉 10 - 12 초 에 GB 의 파일 을 다 읽 을 수 있다.이 index 파일 이 생 성 되면 lzo 기반 압축 파일 은 load 이 index 파일 을 통 해 해당 하 는 블록 을 나 누고 하나의 block 이 하나의 block 을 연결 하여 읽 을 수 있 습 니 다.따라서 각 mapper 는 정확 한 block 을 얻 을 수 있 습 니 다. 이것 은 Lzop InputStream 의 패 키 징 만 하면 hadop 의 mapreduce 에서 lzo 를 효율적으로 사용 할 수 있다 는 것 입 니 다.현재 job 의 InputFormat 이 TextInputFormat 이 라면 lzop 으로 파일 을 압축 하여 index 를 정확하게 만 들 었 는 지 확인 하고 TextInputFormat 을 LzoTextInputFormat 으로 바 꾸 면 job 가 이전 처럼 올 바 르 게 실행 되 고 더욱 빠 를 수 있 습 니 다.가끔 은 큰 파일 이 lzo 에 압축 된 후에 블록 을 나 누 지 않 아 도 하나의 mapper 에 의 해 효율 적 으로 처리 된다.
[동 전 LZO 알고리즘]
//       Lempel-Ziv     ,               
//       Markus Franz Xaver Johannes Oberhumer   LZO   
//   ,              ,              ,
//     LZO     
//       http://wildsau.idv.uni-linz.ac.at/mfx/lzo.html
// LZO    GNU   ,     ,                  
//           ,     GNU     . 
//#include <stdio.h>
//#include <string.h>
#include <stdlib.h>
#define byte unsigned char
int _stdcall compress(void *src, unsigned src_len, void *dst);
int _stdcall decompress(void *src, unsigned src_len,	void *dst);

static unsigned _do_compress (byte *in, unsigned in_len, byte *out, unsigned *out_len)
{
	static long wrkmem [16384L];
    register byte *ip;
    byte *op;
	byte *in_end = in + in_len;
    byte *ip_end = in + in_len - 13;
    byte *ii;
    byte **dict = (byte **)wrkmem;
    op = out;
	ip = in;
	ii = ip;
    ip += 4;
    for(;;)
	{
		register byte *m_pos;
		unsigned m_off;
		unsigned m_len;
		unsigned dindex;
		dindex = ((0x21*(((((((unsigned)(ip[3])<<6)^ip[2])<<5)^ip[1])<<5)^ip[0]))>>5) & 0x3fff;
		m_pos = dict [dindex];
		if(((unsigned)m_pos < (unsigned)in) ||
			(m_off = (unsigned)((unsigned)ip-(unsigned)m_pos) ) <= 0 ||
			m_off > 0xbfff)
			goto literal;
		if(m_off <= 0x0800 || m_pos[3] == ip[3])
			goto try_match;
		dindex = (dindex & 0x7ff ) ^ 0x201f;
		m_pos = dict[dindex];
		if((unsigned)(m_pos) < (unsigned)(in) ||
			(m_off = (unsigned)( (int)((unsigned)ip-(unsigned)m_pos))) <= 0 ||
			m_off > 0xbfff)
		    goto literal;
		if (m_off <= 0x0800 || m_pos[3] == ip[3])
			goto try_match;
		goto literal;
try_match:
		if(*(unsigned short*)m_pos == *(unsigned short*)ip && m_pos[2]==ip[2])
			goto match;
literal:
		dict[dindex] = ip;
		++ip;
		if (ip >= ip_end)
			break;
		continue;
match:
		dict[dindex] = ip;
		if(ip - ii > 0)
		{
			register unsigned t = ip - ii;
			
			if (t <= 3)
				op[-2] |= (byte)t;
			else if(t <= 18)
				*op++ = (byte)(t - 3);
			else
			{
				register unsigned tt = t - 18;
				*op++ = 0;
				while(tt > 255)
				{
					tt -= 255;
					*op++ = 0;
				}
				*op++ = (byte)tt;
			}
			do *op++ = *ii++; while (--t > 0);
		}
		ip += 3;
		if(m_pos[3] != *ip++ || m_pos[4] != *ip++ || m_pos[5] != *ip++ ||
			m_pos[6] != *ip++ || m_pos[7] != *ip++ || m_pos[8] != *ip++ )
		{
			--ip;
			m_len = ip - ii;
			
			if(m_off <= 0x0800 )
			{
				--m_off;
				*op++ = (byte)(((m_len - 1) << 5) | ((m_off & 7) << 2));
				*op++ = (byte)(m_off >> 3);
			}
			else
				if (m_off <= 0x4000 )
				{
					-- m_off;
					*op++ = (byte)(32 | (m_len - 2));
					goto m3_m4_offset;
				}
				else
				{
					m_off -= 0x4000;
					*op++ = (byte)(16 | ((m_off & 0x4000) >> 11) | (m_len - 2));
					goto m3_m4_offset;
				}
		}
		else
		{
			{
				byte *end = in_end;
				byte *m = m_pos + 9;
				while (ip < end && *m == *ip)
					m++, ip++;
				m_len = (ip - ii);
			}
			
			if(m_off <= 0x4000)
			{
				--m_off;
				if (m_len <= 33)
					*op++ = (byte)(32 | (m_len - 2));
				else
				{
					m_len -= 33;
					*op++=32;
					goto m3_m4_len;
				}
			}
			else
			{
				m_off -= 0x4000;
				if(m_len <= 9)
					*op++ = (byte)(16|((m_off & 0x4000) >> 11) | (m_len - 2));
				else
				{
					m_len -= 9;
					*op++ = (byte)(16 | ((m_off & 0x4000) >> 11));
m3_m4_len:
					while (m_len > 255)
					{
						m_len -= 255;
						*op++ = 0;
					}
					*op++ = (byte)m_len;
				}
			}
m3_m4_offset:
			*op++ = (byte)((m_off & 63) << 2);
			*op++ = (byte)(m_off >> 6);
		}
		ii = ip;
		if (ip >= ip_end)
			break;
    }
    *out_len = op - out;
    return (unsigned) (in_end - ii);
}
int _stdcall compress(void *in, unsigned in_len,
			 void *out)
{
    byte *op = out;
    unsigned t,out_len;
    if (in_len <= 13)
		t = in_len;
    else 
	{
		t = _do_compress (in,in_len,op,&out_len);
		op += out_len;
    }
    if (t > 0)
	{
		byte *ii = (byte*)in + in_len - t;
		if (op == (byte*)out && t <= 238)
			*op++ = (byte) ( 17 + t );
		else
			if (t <= 3)
				op[-2] |= (byte)t ;
			else
				if (t <= 18)
					*op++ = (byte)(t-3);
				else
				{
					unsigned tt = t - 18;
					*op++ = 0;
					while (tt > 255) 
					{
						tt -= 255;
						*op++ = 0;
					}
					*op++ = (byte)tt;
				}
				do *op++ = *ii++; while (--t > 0);
    }
    *op++ = 17;
    *op++ = 0;
    *op++ = 0;
    return (op - (byte*)out);
}
int _stdcall decompress (void *in, unsigned in_len,
				void *out)
{
    register byte *op;
    register byte *ip;
    register unsigned t;
    register byte *m_pos;
    byte *ip_end = (byte*)in + in_len;
    op = out;
    ip = in;
    if(*ip > 17)
	{
		t = *ip++ - 17;
		if (t < 4)
			goto match_next;
		do *op++ = *ip++; while (--t > 0);
		goto first_literal_run;
    }
    for(;;)
	{
		t = *ip++;
		if (t >= 16) goto match;
		if (t == 0)
		{
			while (*ip == 0)
			{
				t += 255;
				ip++;
			}
			t += 15 + *ip++;
		}
		* (unsigned *) op = * ( unsigned *) ip;
		op += 4; ip += 4;
		if (--t > 0)
		{
			if (t >= 4)
			{
				do
				{
					* (unsigned * ) op = * ( unsigned * ) ip;
					op += 4; ip += 4; t -= 4;
				} while (t >= 4);
				if (t > 0) do *op++ = *ip++; while (--t > 0);
			}
			else
				do *op++ = *ip++; while (--t > 0);
		}
first_literal_run:
		t = *ip++;
		if (t >= 16)
			goto match;
		m_pos = op - 0x0801;
		m_pos -= t >> 2;
		m_pos -= *ip++ << 2;
		*op++ = *m_pos++; *op++ = *m_pos++; *op++ = *m_pos;
		goto match_done;
		for(;;)
		{
match:
		if (t >= 64)
		{
			m_pos = op - 1;
			m_pos -= (t >> 2) & 7;
			m_pos -= *ip++ << 3;
			t = (t >> 5) - 1;
			goto copy_match;
		}
		else 
			if (t >= 32)
			{
				t &= 31;
				if (t == 0)	
				{
					while (*ip == 0) 
					{
						t += 255;
						ip++;
					}
					t += 31 + *ip++;
				}
				m_pos = op - 1;
				m_pos -= (* ( unsigned short * ) ip) >> 2;
				ip += 2;
			}
			else
				if (t >= 16) 
				{
					m_pos = op;
					m_pos -= (t & 8) << 11;
					t &= 7;
					if (t == 0)
					{
						while (*ip == 0)
						{
							t += 255;
							ip++;
						}
						t += 7 + *ip++;
					}
					m_pos -= (* ( unsigned short *) ip) >> 2;
					ip += 2;
					if (m_pos == op)
						goto eof_found;
					m_pos -= 0x4000;
				}
				else 
				{
					m_pos = op - 1;
					m_pos -= t >> 2;
					m_pos -= *ip++ << 2;
					*op++ = *m_pos++; *op++ = *m_pos;
					goto match_done;
				}
				if (t >= 6 && (op - m_pos) >= 4) 
				{
					* (unsigned *) op = * ( unsigned *) m_pos;
					op += 4; m_pos += 4; t -= 2;
					do
					{
						* (unsigned *) op = * ( unsigned *) m_pos;
						op += 4; m_pos += 4; t -= 4;
					}while (t >= 4);
					if (t > 0)
						do *op++ = *m_pos++; while (--t > 0);
				}
				else
				{
copy_match:
				*op++ = *m_pos++; *op++ = *m_pos++;
				do *op++ = *m_pos++; while (--t > 0);
				}
match_done:
				t = ip[-2] & 3;
				if (t == 0)	break;
match_next:
				do *op++ = *ip++; while (--t > 0);
				t = *ip++;
		}
   }
eof_found:
   if (ip != ip_end) return -1;
   return (op - (byte*)out);
}


좋은 웹페이지 즐겨찾기