Spark에서 OpenStack Swift 기반 IBM Object Storage에 연결해 본 메모

데이터 소스는 여러 가지면서도, 서둘러 파일 그대로 스토리지에 넣어 두는 것이 장애물은 낮을 것 같다. Spark와 같은 빅데이터 전제라면 로그 파일 등이 상정되는 경우도 많을 것입니다만, 일반적인 비즈니스 데이터는 피해서 통과할 수 없기 때문에 우선은 CSV, 라고 하는 것으로 CSV 주위를 조금 시험해 보았을 때 메모. (IBM Data Scientist Experience , Python 2 with Spark 2.0에서)

준비



CSV 파일을 Swift 기반 IBM Object Storage에 업로드하십시오. 화면 오른쪽의 "Drop you file here or browse your files to add a new file "을 사용. 스크린 샷과 같이 baseball.csv, cars.csv, whiskey.csv를 업로드했습니다.


Object Storage에서 데이터를 읽어 보았습니다.



먼저 DSX 코드에서 삽입 된 코드로 시도했습니다.


여기 구성을 설정하는 처리와 CSV 데이터를 DataFrame에로드하는 처리가 있습니다.
(1) Spark에서 Hadoop 구성 설정
Accessing OpenStack Swift from Spark에 설명 된 구성 매개 변수가 IBM Object Storage에 업로드 된 파일에 맞게 설정된 패턴.

test.py
# @hidden_cell
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# This function accesses a file in your Object Storage. The definition contains your credentials.
# You might want to remove those credentials before you share (name):
def set_hadoop_config_with_credentials_xxxxxxxxxxxxxxxxxxcxxxxxcc(name):
    """This function sets the Hadoop configuration so it is possible to
    access data from Bluemix Object Storage V3 using Spark"""

    prefix = 'fs.swift.service.' + name
    hconf = sc._jsc.hadoopConfiguration()
    hconf.set(prefix + '.auth.url', 'https://identity.open.softlayer.com'+'/v3/auth/tokens')
    hconf.set(prefix + '.auth.endpoint.prefix', 'endpoints')
    hconf.set(prefix + '.tenant', '4dbbbca3a8ec42eea9349120fb91dcf9')
    hconf.set(prefix + '.username', 'xxxxxxxxxcxxxxxxcccxxxxxccc')
    hconf.set(prefix + '.password', 'Xxxxxxxxxxxxxxxxx')
    hconf.setInt(prefix + '.http.port', 8080)
    hconf.set(prefix + '.region', 'dallas')
    hconf.setBoolean(prefix + '.public', True)

# you can choose any name
name = 'keystone'
set_hadoop_config_with_credentials_Xxxxxxxxxxxxxxxxxxcxxxxxc(name)

(2) Spark DataFrame에 CSV 데이터로드

test.py
df_data_1=sqlContext.read.format('com.databricks.spark.csv')\
    .options(header='true',inferschema='true')\
    .load("swift://PredictiveAnalyticsProject2." + name + "/whiskey.csv")

이것은 Spark 2.0 이전에 필요했던 spark-csv라는 패키지를 사용하는 것 같습니다.
spark2.0부터는 Spark DataFrame이 직접 csv를 취급할 수 있는 것 같고, read.csv()도 사용할 수 있어 편리하게 되어 있다.


Object Storage에 데이터를 내보냈습니다.



whisky.csv를 읽은 Spark DataFrame의 내용을 whiskey_new.csv라는 이름으로 Object Storage에 내보냈다.
(1) Spark DataFrame의 write로 출력해 보았다
간단하게 write.csv ()라고 쓰면 OK.


mode는 여기 에 있는 것처럼 Save Mode를 이용할 수 있는 것 같습니다.
출력되는 파일을 보면 아래 스크린 샷과 같이 하나의 CSV가 아니라 여러 파일로 분할되어 출력되었습니다. 여러 노드에서 처리하고 있기 때문이라고 생각합니다. (이상한 것으로, 이것을 Spark에서 다시 read.csv로 읽을 때는 하나의 textFile, csv로 취급되기 때문에 문제없는 것 같습니다).

하지만 하나의 파일로 내고 싶은 사람도 나 이외에도있는 것 같고, 하나의 파일에 출력할 수 없는지 문의하고 있는 Q&A도 있었습니다(h tp : / / s tac ゔ ぇ rf ぉ w. 코 m / 쿠에 s 치온 s / 31674530 / w 리테 - 신 g ぇ - csv - ぃ ぇ - 우신 g - 빠 rk-csv)

(2) Spark를 사용하지 않고 REST API를 사용해 보았습니다.
대량 데이터를 취급하는 경우에는 적합하지 않다고 생각합니다만, 이하와 같은 코드로 Object Storage상에 하나의 CSV로서 put 할 수 있습니다. (DSX가 Pandas DataFrame 작성용으로 인서트하는 코드의 get 부분을 put로 변경하고 있을 뿐입니다. 조립하는 것 같습니다)

put_sample.py
def put_object_storage_file_with_credentials_xxxxxxxxxx(container, filename, indata):
    url1 = ''.join(['https://identity.open.softlayer.com', '/v3/auth/tokens'])
    data = {'auth': {'identity': {'methods': ['password'],
            'password': {'user': {'name': 'member_1825cd3bc875420fc629ccfd22c22e20433a7ac9','domain': {'id': '07e33cca1abe47d293b86de49f1aa8bc'},
            'password': 'xxxxxxxxxx'}}}}}
    headers1 = {'Content-Type': 'application/json'}
    resp1 = requests.post(url=url1, data=json.dumps(data), headers=headers1)
    resp1_body = resp1.json()
    for e1 in resp1_body['token']['catalog']:
        if(e1['type']=='object-store'):
            for e2 in e1['endpoints']:
                        if(e2['interface']=='public'and e2['region']=='dallas'):
                            url2 = ''.join([e2['url'],'/', container, '/', filename])
    s_subject_token = resp1.headers['x-subject-token']
    headers2 = {'X-Auth-Token': s_subject_token, 'accept': 'application/json'}
    resp2 = requests.put(url=url2, headers=headers2 , data=indata)
    print resp2
    return (resp2.content)


put_object_storage_file_with_credentials_xxxxxxxxxx( 'PredictiveAnalyticsProject2', 'whiskey_new_s.csv' , df_data_2.to_csv( index = False ) )


추가



위의 REST API로 Put하는 이미지로 Python의 swiftclient를 이용하는 방법도 있는 것 같습니다만, IBM의 Data Scientist Experience 환경에서는 사용할 수 없는 것 같습니다.
Using IBM Object Storage in Bluemix, with Python

좋은 웹페이지 즐겨찾기