近期接触一个从Azure 上迁移数仓到AWS的案例。一直想总结一下,在这个case中接触到的工具以及方法。
原来的架构
目标架构:
在AWS上形成了基于 s3+aws glue+redshift构建的数据体系,其中S3可用作数据湖存储,Glue用作ETL/ELT工具,redshift是基于postgresql的列式存储,作为数仓
在这个技术栈上构建了一个demo,实现下面场景:
从天气API上获取指定地区天气信息,经过简单数据处理(ETL),存储到redshift上指定表中。
步骤:
1.准备资源并赋予相应IAM权限
资源:S3,glue ,redshift,创建S3目录:pocs3inputdev/weather_info/,
创建redshift表:
- CREATE TABLE weatherdata (
- city VARCHAR(256) ENCODE ZSTD,
- province VARCHAR(256) ENCODE ZSTD,
- adcode VARCHAR(64) ENCODE ZSTD,
- timestamp TIMESTAMP ENCODE DELTA32K,
- weather VARCHAR(256) ENCODE ZSTD,
- temperature VARCHAR(64) ENCODE ZSTD,
- winddirection VARCHAR(128) ENCODE ZSTD,
- windpower VARCHAR(64) ENCODE ZSTD,
- humidity VARCHAR(64) ENCODE ZSTD,
- reporttime VARCHAR(256) ENCODE ZSTD
- )
- DISTKEY(adcode)
- SORTKEY(timestamp);
复制代码 View Code ps:注意redshift的DDL格式,主要关注DISTKEY,SORTKEY,以及字段的加密算法ENCODE
权限:glue中访问S3 PUT /DELETE权限,glue中对S3的fullaccess权限,glue中对redshift的fullaccess权限。
2.所有ETL的表都在glue的Meta catelog中管理,所以需要先在glue中建表
创建数据库dw-pbi-poc-glue-db
通过“Add table”创建表s3_weather_info,手动设置字段,指定S3目录:s3://pocs3inputdev/weather_info/
(也可以通过“Add tables using crawler”,指定S3目录,可以自动识别csv的head,并自动创建表)
3.在glue中创建job, 组装workflow 形成pipeline
我们设计两个job,job1使用python shell模式实现读取API,获取数据存储到S3上(CSV)
job2实现读取S3上数据,经过ETL(简单字段映射等),存储到redshift指定表
job1:
glue-->ETL jobs-->script editor-->python
- import requests
- import pandas as pd
- import boto3
- from datetime import datetime
- import json
- def get_weather_data_and_save_to_s3():
- # API endpoint and parameters
- url = "https://restapi.amap.com/v3/weather/weatherInfo"
- params = {
- 'key': 'd84fdf9cc1f19710bb7ff6c3cd924726',
- 'city': '110102',
- 'extensions': 'base',
- 'output': 'JSON'
- }
-
- # Make HTTP GET request
- response = requests.get(url, params=params)
-
- if response.status_code == 200:
- data = response.json()
-
- # Process the weather data
- if data.get('status') == '1' and 'lives' in data:
- weather_info = data['lives'][0] # Get the first (and likely only) record
-
- # Create a DataFrame with the weather data
- df = pd.DataFrame([{
- 'timestamp': datetime.now().isoformat(),
- 'province': weather_info.get('province', ''),
- 'city': weather_info.get('city', ''),
- 'adcode': weather_info.get('adcode', ''),
- 'weather': weather_info.get('weather', ''),
- 'temperature': weather_info.get('temperature', ''),
- 'winddirection': weather_info.get('winddirection', ''),
- 'windpower': weather_info.get('windpower', ''),
- 'humidity': weather_info.get('humidity', ''),
- 'reporttime': weather_info.get('reporttime', '')
- }])
-
- # Save CSV to local C drive with UTF-8 BOM encoding for proper Chinese display
- # local_file_path = f"C:/Users/a765902/Desktop/KGS 材料/spike poc/weather_data.csv"
-
- # # Use pandas to_csv with proper encoding
- # df.to_csv(local_file_path, index=False, encoding='utf-8-sig')
- # print(f"Successfully saved CSV to local drive: {local_file_path}")
- csv_buffer=df.to_csv(index=False)
-
- try:
- # 你的 boto3 调用
-
- # Upload to S3
- s3_client = boto3.client('s3')
- bucket_name = 'pocs3inputdev' # Replace with your bucket name
- #file_name = f"weather_data_{datetime.now().strftime('%Y%m%d')}.csv"
- file_name = f"weather_info/weather_data.csv"
-
- # Upload the CSV data to S3
- s3_client.put_object(
- Bucket=bucket_name,
- Key=file_name,
- Body=csv_buffer,
- ContentType='text/csv'
- )
-
- print(f"Successfully saved weather data to s3://{bucket_name}/weather_info/{file_name}")
- return f"s3://{bucket_name}/weather_info/{file_name}"
- except Exception as e:
- print(f"Error: {e}")
- raise
- else:
- print("Error: Invalid response data from weather API")
- return None
- else:
- print(f"Error: HTTP {response.status_code} - {response.text}")
- return None
- # Execute the function
- if __name__ == "__main__":
- get_weather_data_and_save_to_s3()
复制代码 View Code job2:
glue-->ETL jobs-->Visual ETL
拖拉拽相应控件,最终脚本:
- import sys
- from awsglue.transforms import *
- from awsglue.utils import getResolvedOptions
- from pyspark.context import SparkContext
- from awsglue.context import GlueContext
- from awsglue.job import Job
- from awsglue import DynamicFrame
- args = getResolvedOptions(sys.argv, ['JOB_NAME'])
- sc = SparkContext()
- glueContext = GlueContext(sc)
- spark = glueContext.spark_session
- job = Job(glueContext)
- job.init(args['JOB_NAME'], args)
- # Script generated for node Amazon S3
- AmazonS3_node1754055290874 = glueContext.create_dynamic_frame.from_options(format_options={"quoteChar": """, "withHeader": True, "separator": ",", "optimizePerformance": False}, connection_type="s3", format="csv", connection_options={"paths": ["s3://pocs3inputdev/weather_data.csv"], "recurse": True}, transformation_ctx="AmazonS3_node1754055290874")
- # Script generated for node Rename Field
- RenameField_node1754055306801 = RenameField.apply(frame=AmazonS3_node1754055290874, old_name="winddirection", new_name="direction", transformation_ctx="RenameField_node1754055306801")
- # Script generated for node Amazon Redshift
- AmazonRedshift_node1754055312487 = glueContext.write_dynamic_frame.from_options(frame=RenameField_node1754055306801, connection_type="redshift", connection_options={"redshiftTmpDir": "s3://aws-glue-assets-455512573562-cn-northwest-1/temporary/", "useConnectionProperties": "true", "dbtable": "public.weatherdata", "connectionName": "Redshift connection", "preactions": "CREATE TABLE IF NOT EXISTS public.weatherdata (timestamp VARCHAR, province VARCHAR, city VARCHAR, adcode VARCHAR, weather VARCHAR, temperature VARCHAR, direction VARCHAR, windpower VARCHAR, humidity VARCHAR, reporttime VARCHAR);"}, transformation_ctx="AmazonRedshift_node1754055312487")
- job.commit()
复制代码 View Code创建workflow,通过trigger串联job1,job2
trigger方式可以设置为按天,按周,按需,自定义等
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |