diff --git a/package.xml b/package.xml
index cbe0621ab..78e85b648 100755
--- a/package.xml
+++ b/package.xml
@@ -203,6 +203,13 @@
datax
+
+ s3writer/target/datax/
+
+ **/*.*
+
+ datax
+
ftpwriter/target/datax/
diff --git a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Key.java b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Key.java
index 2e7fe079f..1b8f0ad17 100755
--- a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Key.java
+++ b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Key.java
@@ -35,4 +35,13 @@ public class Key {
// writer file type suffix, like .txt .csv
public static final String SUFFIX = "suffix";
+
+ public static final String S3_BUCKET = "s3Bucket";
+
+ public static final String S3_ACCESS_KEY = "s3AccessKey";
+
+ public static final String S3_SECRET_KEY = "s3SecretKey";
+
+ public static final String S3_ENDPOINT = "s3Endpoint";
+
}
diff --git a/pom.xml b/pom.xml
index 15c5b98ca..ba3fd82e0 100755
--- a/pom.xml
+++ b/pom.xml
@@ -82,6 +82,7 @@
rdbmswriter
hbase11xwriter
hbase094xwriter
+ s3writer
plugin-rdbms-util
diff --git a/s3writer/doc/s3writer.md b/s3writer/doc/s3writer.md
new file mode 100644
index 000000000..231dc2c19
--- /dev/null
+++ b/s3writer/doc/s3writer.md
@@ -0,0 +1,207 @@
+# DataX S3Writer 说明
+
+
+------------
+
+## 1 快速介绍
+
+S3Writer提供了向S3写入类CSV格式的一个或者多个表文件。
+
+**写入S3文件内容存放的是一张逻辑意义上的二维表,例如CSV格式的文本信息。**
+
+
+## 2 功能与限制
+
+S3Writer实现了从DataX协议转为S3TXT文件功能,S3文件本身是无结构化数据存储,S3Writer如下几个方面约定:
+
+1. 支持且仅支持写入 TXT的文件,且要求TXT中shema为一张二维表。
+
+2. 支持类CSV格式文件,自定义分隔符。
+
+3. 支持文本压缩,现有压缩格式为gzip、bzip2。
+
+6. 支持多线程写入,每个线程写入不同子文件。
+
+7. 文件支持滚动,当文件大于某个size值或者行数值,文件需要切换。 [暂不支持]
+
+我们不能做到:
+
+1. 单个文件不能支持并发写入。
+
+
+## 3 功能说明
+
+
+### 3.1 配置样例
+
+```json
+{
+ {
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "mysqlreader",
+ "parameter": {
+ "column": ["*"],
+ "connection": [
+ {
+ "jdbcUrl": ["jdbc:mysql://xxx:3306/xxx"],
+ "table": ["yyy"]
+ }
+ ],
+ "password": "root",
+ "username": "root",
+ "where": ""
+ }
+ },
+ "writer": {
+ "name": "s3writer",
+ "parameter": {
+ "s3Bucket": "xxx",
+ "s3AccessKey": "xxx",
+ "s3SecretKey": "xxx+",
+ "s3Endpoint": "s3.cn-north-1.amazonaws.com.cn",
+
+ "dateFormat": "",
+ "fieldDelimiter": ",",
+ "fileName": "yyy",
+ "path": "xxx/xxx",
+ "writeMode": "truncate"
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 10
+ }
+ }
+ }
+ }
+}
+```
+
+### 3.2 参数说明
+
+* **path**
+
+ * 描述:S3文件系统的路径信息,S3Writer会写入Path目录下属多个文件。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **fileName**
+
+ * 描述:S3Writer写入的文件名,该文件名会添加随机的后缀作为每个线程写入实际文件名。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **writeMode**
+
+ * 描述:S3Writer写入前数据清理处理模式:
+
+ * truncate,写入前清理目录下一fileName前缀的所有文件。
+ * append,写入前不做任何处理,DataX S3Writer直接使用filename写入,并保证文件名不冲突。
+ * nonConflict,如果目录下有fileName前缀的文件,直接报错。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **fieldDelimiter**
+
+ * 描述:读取的字段分隔符
+
+ * 必选:否
+
+ * 默认值:,
+
+* **compress**
+
+ * 描述:文本压缩类型,默认不填写意味着没有压缩。支持压缩类型为zip、lzo、lzop、tgz、bzip2。
+
+ * 必选:否
+
+ * 默认值:无压缩
+
+* **encoding**
+
+ * 描述:读取文件的编码配置。
+
+ * 必选:否
+
+ * 默认值:utf-8
+
+
+* **nullFormat**
+
+ * 描述:文本文件中无法使用标准字符串定义null(空指针),DataX提供nullFormat定义哪些字符串可以表示为null。
+
+ 例如如果用户配置: nullFormat="\N",那么如果源头数据是"\N",DataX视作null字段。
+
+ * 必选:否
+
+ * 默认值:\N
+
+* **dateFormat**
+
+ * 描述:日期类型的数据序列化到文件中时的格式,例如 "dateFormat": "yyyy-MM-dd"。
+
+ * 必选:否
+
+ * 默认值:无
+
+* **fileFormat**
+
+ * 描述:文件写出的格式,包括csv (http://zh.wikipedia.org/wiki/%E9%80%97%E5%8F%B7%E5%88%86%E9%9A%94%E5%80%BC) 和text两种,csv是严格的csv格式,如果待写数据包括列分隔符,则会按照csv的转义语法转义,转义符号为双引号";text格式是用列分隔符简单分割待写数据,对于待写数据包括列分隔符情况下不做转义。
+
+ * 必选:否
+
+ * 默认值:text
+
+* **header**
+
+ * 描述:txt写出时的表头,示例['id', 'name', 'age']。
+
+ * 必选:否
+
+ * 默认值:无
+
+### 3.3 类型转换
+
+
+S3文件本身不提供数据类型,该类型是DataX S3Writer定义:
+
+| DataX 内部类型| S3文件 数据类型 |
+| -------- | ----- |
+|
+| Long |Long |
+| Double |Double|
+| String |String|
+| Boolean |Boolean |
+| Date |Date |
+
+其中:
+
+* S3文件 Long是指S3文件文本中使用整形的字符串表示形式,例如"19901219"。
+* S3文件 Double是指S3文件文本中使用Double的字符串表示形式,例如"3.1415"。
+* S3文件 Boolean是指S3文件文本中使用Boolean的字符串表示形式,例如"true"、"false"。不区分大小写。
+* S3文件 Date是指S3文件文本中使用Date的字符串表示形式,例如"2014-12-31",Date可以指定format格式。
+
+
+## 4 性能报告
+
+
+## 5 约束限制
+
+略
+
+## 6 FAQ
+
+略
+
+
diff --git a/s3writer/pom.xml b/s3writer/pom.xml
new file mode 100755
index 000000000..b3e13bf54
--- /dev/null
+++ b/s3writer/pom.xml
@@ -0,0 +1,88 @@
+
+ 4.0.0
+
+ com.alibaba.datax
+ datax-all
+ 0.0.1-SNAPSHOT
+
+
+ s3writer
+ s3writer
+ S3Writer提供了本地写入TEXT功能,建议开发、测试环境使用。
+ jar
+
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax-project-version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+ com.alibaba.datax
+ plugin-unstructured-storage-util
+ ${datax-project-version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+ ch.qos.logback
+ logback-classic
+
+
+ com.google.guava
+ guava
+ 16.0.1
+
+
+ com.amazonaws
+ aws-java-sdk-core
+ 1.11.52
+
+
+ com.amazonaws
+ aws-java-sdk-s3
+ 1.11.52
+
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ 1.6
+ 1.6
+ ${project-sourceEncoding}
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
diff --git a/s3writer/src/main/assembly/package.xml b/s3writer/src/main/assembly/package.xml
new file mode 100755
index 000000000..d5b762bbd
--- /dev/null
+++ b/s3writer/src/main/assembly/package.xml
@@ -0,0 +1,35 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/writer/s3writer
+
+
+ target/
+
+ s3writer-0.0.1-SNAPSHOT.jar
+
+ plugin/writer/s3writer
+
+
+
+
+
+ false
+ plugin/writer/s3writer/libs
+ runtime
+
+
+
diff --git a/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/Key.java b/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/Key.java
new file mode 100755
index 000000000..f1169a841
--- /dev/null
+++ b/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/Key.java
@@ -0,0 +1,6 @@
+package com.alibaba.datax.plugin.writer.s3writer;
+
+public class Key {
+ // must have
+ public static final String PATH = "path";
+}
diff --git a/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3Util.java b/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3Util.java
new file mode 100644
index 000000000..5d9eb48ae
--- /dev/null
+++ b/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3Util.java
@@ -0,0 +1,63 @@
+package com.alibaba.datax.plugin.writer.s3writer;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.model.Bucket;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
+import com.amazonaws.services.s3.transfer.TransferProgress;
+import com.amazonaws.services.s3.transfer.Upload;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+
+public class S3Util {
+
+ private final String bucket;
+ private final String accessKey;
+ private final String secretKey;
+ private final String endpoint;
+ private final AmazonS3 s3Client;
+
+ public S3Util(String bucket, String accessKey, String secretKey, String endpoint) {
+ this.bucket = bucket;
+ this.accessKey = accessKey;
+ this.secretKey = secretKey;
+ this.endpoint = endpoint;
+
+ this.s3Client = new AmazonS3Client(new BasicAWSCredentials(this.accessKey, this.secretKey));
+ this.s3Client.setEndpoint(this.endpoint);
+ }
+
+ public void upload(String from, String to) {
+ TransferManager transferManager = new TransferManager(this.s3Client);
+
+ Upload upload = transferManager.upload(this.bucket, to, new File(from));
+ TransferProgress p = upload.getProgress();
+ while (upload.isDone() == false) {
+ int percent = (int) (p.getPercentTransferred());
+ System.out.print("\r" + from + " - " + "[ " + percent + "% ] "
+ + p.getBytesTransferred() + " / " + p.getTotalBytesToTransfer());
+ try {
+ Thread.sleep(500);
+ } catch (Exception e) {
+
+ }
+ }
+ try {
+ upload.waitForCompletion();
+ s3Client.setObjectAcl(this.bucket, to, CannedAccessControlList.PublicRead);
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ } finally {
+ transferManager.shutdownNow();
+ }
+ System.out.print("\r" + from + " - " + "[ 100% ] "
+ + p.getBytesTransferred() + " / " + p.getTotalBytesToTransfer());
+ }
+}
\ No newline at end of file
diff --git a/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3Writer.java b/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3Writer.java
new file mode 100755
index 000000000..3a6afcdea
--- /dev/null
+++ b/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3Writer.java
@@ -0,0 +1,357 @@
+package com.alibaba.datax.plugin.writer.s3writer;
+
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.spi.Writer;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.unstructuredstorage.writer.UnstructuredStorageWriterUtil;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.filefilter.PrefixFileFilter;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Created by haiwei.luo on 14-9-17.
+ */
+public class S3Writer extends Writer {
+ public static class Job extends Writer.Job {
+ private static final Logger LOG = LoggerFactory.getLogger(Job.class);
+
+ private Configuration writerSliceConfig = null;
+
+ @Override
+ public void init() {
+ this.writerSliceConfig = this.getPluginJobConf();
+ this.validateParameter();
+ String dateFormatOld = this.writerSliceConfig
+ .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.FORMAT);
+ String dateFormatNew = this.writerSliceConfig
+ .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.DATE_FORMAT);
+ if (null == dateFormatNew) {
+ this.writerSliceConfig
+ .set(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.DATE_FORMAT,
+ dateFormatOld);
+ }
+ if (null != dateFormatOld) {
+ LOG.warn("您使用format配置日期格式化, 这是不推荐的行为, 请优先使用dateFormat配置项, 两项同时存在则使用dateFormat.");
+ }
+ UnstructuredStorageWriterUtil
+ .validateParameter(this.writerSliceConfig);
+ }
+
+ private void validateParameter() {
+ this.writerSliceConfig
+ .getNecessaryValue(
+ com.alibaba.datax.plugin.unstructuredstorage.writer.Key.FILE_NAME,
+ S3WriterErrorCode.REQUIRED_VALUE);
+
+ String path = this.writerSliceConfig.getNecessaryValue(Key.PATH,
+ S3WriterErrorCode.REQUIRED_VALUE);
+
+ try {
+ // warn: 这里用户需要配一个目录
+ File dir = new File(path);
+ if (dir.isFile()) {
+ throw DataXException
+ .asDataXException(
+ S3WriterErrorCode.ILLEGAL_VALUE,
+ String.format(
+ "您配置的path: [%s] 不是一个合法的目录, 请您注意文件重名, 不合法目录名等情况.",
+ path));
+ }
+ if (!dir.exists()) {
+ boolean createdOk = dir.mkdirs();
+ if (!createdOk) {
+ throw DataXException
+ .asDataXException(
+ S3WriterErrorCode.CONFIG_INVALID_EXCEPTION,
+ String.format("您指定的文件路径 : [%s] 创建失败.",
+ path));
+ }
+ }
+ } catch (SecurityException se) {
+ throw DataXException.asDataXException(
+ S3WriterErrorCode.SECURITY_NOT_ENOUGH,
+ String.format("您没有权限创建文件路径 : [%s] ", path), se);
+ }
+ }
+
+ @Override
+ public void prepare() {
+ String path = this.writerSliceConfig.getString(Key.PATH);
+ String fileName = this.writerSliceConfig
+ .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.FILE_NAME);
+ String writeMode = this.writerSliceConfig
+ .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.WRITE_MODE);
+ // truncate option handler
+ if ("truncate".equals(writeMode)) {
+ LOG.info(String.format(
+ "由于您配置了writeMode truncate, 开始清理 [%s] 下面以 [%s] 开头的内容",
+ path, fileName));
+ File dir = new File(path);
+ // warn:需要判断文件是否存在,不存在时,不能删除
+ try {
+ if (dir.exists()) {
+ // warn:不要使用FileUtils.deleteQuietly(dir);
+ FilenameFilter filter = new PrefixFileFilter(fileName);
+ File[] filesWithFileNamePrefix = dir.listFiles(filter);
+ for (File eachFile : filesWithFileNamePrefix) {
+ LOG.info(String.format("delete file [%s].",
+ eachFile.getName()));
+ FileUtils.forceDelete(eachFile);
+ }
+ // FileUtils.cleanDirectory(dir);
+ }
+ } catch (NullPointerException npe) {
+ throw DataXException
+ .asDataXException(
+ S3WriterErrorCode.Write_FILE_ERROR,
+ String.format("您配置的目录清空时出现空指针异常 : [%s]",
+ path), npe);
+ } catch (IllegalArgumentException iae) {
+ throw DataXException.asDataXException(
+ S3WriterErrorCode.SECURITY_NOT_ENOUGH,
+ String.format("您配置的目录参数异常 : [%s]", path));
+ } catch (SecurityException se) {
+ throw DataXException.asDataXException(
+ S3WriterErrorCode.SECURITY_NOT_ENOUGH,
+ String.format("您没有权限查看目录 : [%s]", path));
+ } catch (IOException e) {
+ throw DataXException.asDataXException(
+ S3WriterErrorCode.Write_FILE_ERROR,
+ String.format("无法清空目录 : [%s]", path), e);
+ }
+ } else if ("append".equals(writeMode)) {
+ LOG.info(String
+ .format("由于您配置了writeMode append, 写入前不做清理工作, [%s] 目录下写入相应文件名前缀 [%s] 的文件",
+ path, fileName));
+ } else if ("nonConflict".equals(writeMode)) {
+ LOG.info(String.format(
+ "由于您配置了writeMode nonConflict, 开始检查 [%s] 下面的内容", path));
+ // warn: check two times about exists, mkdirs
+ File dir = new File(path);
+ try {
+ if (dir.exists()) {
+ if (dir.isFile()) {
+ throw DataXException
+ .asDataXException(
+ S3WriterErrorCode.ILLEGAL_VALUE,
+ String.format(
+ "您配置的path: [%s] 不是一个合法的目录, 请您注意文件重名, 不合法目录名等情况.",
+ path));
+ }
+ // fileName is not null
+ FilenameFilter filter = new PrefixFileFilter(fileName);
+ File[] filesWithFileNamePrefix = dir.listFiles(filter);
+ if (filesWithFileNamePrefix.length > 0) {
+ List allFiles = new ArrayList();
+ for (File eachFile : filesWithFileNamePrefix) {
+ allFiles.add(eachFile.getName());
+ }
+ LOG.error(String.format("冲突文件列表为: [%s]",
+ StringUtils.join(allFiles, ",")));
+ throw DataXException
+ .asDataXException(
+ S3WriterErrorCode.ILLEGAL_VALUE,
+ String.format(
+ "您配置的path: [%s] 目录不为空, 下面存在其他文件或文件夹.",
+ path));
+ }
+ } else {
+ boolean createdOk = dir.mkdirs();
+ if (!createdOk) {
+ throw DataXException
+ .asDataXException(
+ S3WriterErrorCode.CONFIG_INVALID_EXCEPTION,
+ String.format(
+ "您指定的文件路径 : [%s] 创建失败.",
+ path));
+ }
+ }
+ } catch (SecurityException se) {
+ throw DataXException.asDataXException(
+ S3WriterErrorCode.SECURITY_NOT_ENOUGH,
+ String.format("您没有权限查看目录 : [%s]", path));
+ }
+ } else {
+ throw DataXException
+ .asDataXException(
+ S3WriterErrorCode.ILLEGAL_VALUE,
+ String.format(
+ "仅支持 truncate, append, nonConflict 三种模式, 不支持您配置的 writeMode 模式 : [%s]",
+ writeMode));
+ }
+ }
+
+ @Override
+ public void post() {
+
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+
+ @Override
+ public List split(int mandatoryNumber) {
+ LOG.info("begin do split...");
+ List writerSplitConfigs = new ArrayList();
+ String filePrefix = this.writerSliceConfig
+ .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.FILE_NAME);
+
+ Set allFiles = new HashSet();
+ String path = null;
+ try {
+ path = this.writerSliceConfig.getString(Key.PATH);
+ File dir = new File(path);
+ allFiles.addAll(Arrays.asList(dir.list()));
+ } catch (SecurityException se) {
+ throw DataXException.asDataXException(
+ S3WriterErrorCode.SECURITY_NOT_ENOUGH,
+ String.format("您没有权限查看目录 : [%s]", path));
+ }
+
+ String fileSuffix;
+ for (int i = 0; i < mandatoryNumber; i++) {
+ // handle same file name
+
+ Configuration splitedTaskConfig = this.writerSliceConfig
+ .clone();
+
+ String fullFileName = null;
+ fileSuffix = UUID.randomUUID().toString().replace('-', '_');
+ fullFileName = String.format("%s__%s", filePrefix, fileSuffix);
+ while (allFiles.contains(fullFileName)) {
+ fileSuffix = UUID.randomUUID().toString().replace('-', '_');
+ fullFileName = String.format("%s__%s", filePrefix,
+ fileSuffix);
+ }
+ allFiles.add(fullFileName);
+
+ splitedTaskConfig
+ .set(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.FILE_NAME,
+ fullFileName);
+
+ LOG.info(String.format("splited write file name:[%s]",
+ fullFileName));
+
+ writerSplitConfigs.add(splitedTaskConfig);
+ }
+ LOG.info("end do split.");
+ return writerSplitConfigs;
+ }
+ }
+
+ public static class Task extends Writer.Task {
+ private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+ private Configuration writerSliceConfig;
+
+ private String path;
+
+ private String fileName;
+
+ @Override
+ public void init() {
+ this.writerSliceConfig = this.getPluginJobConf();
+ this.path = this.writerSliceConfig.getString(Key.PATH);
+ this.fileName = this.writerSliceConfig
+ .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.FILE_NAME);
+ }
+
+ @Override
+ public void prepare() {
+
+ }
+
+ @Override
+ public void startWrite(RecordReceiver lineReceiver) {
+ LOG.info("begin do write...");
+ String fileFullPath = this.buildFilePath();
+ LOG.info(String.format("write to file : [%s]", fileFullPath));
+
+ OutputStream outputStream = null;
+ try {
+ File newFile = new File(fileFullPath);
+ newFile.createNewFile();
+ outputStream = new FileOutputStream(newFile);
+ UnstructuredStorageWriterUtil.writeToStream(lineReceiver,
+ outputStream, this.writerSliceConfig, this.fileName,
+ this.getTaskPluginCollector());
+ } catch (SecurityException se) {
+ throw DataXException.asDataXException(
+ S3WriterErrorCode.SECURITY_NOT_ENOUGH,
+ String.format("您没有权限创建文件 : [%s]", this.fileName));
+ } catch (IOException ioe) {
+ throw DataXException.asDataXException(
+ S3WriterErrorCode.Write_FILE_IO_ERROR,
+ String.format("无法创建待写文件 : [%s]", this.fileName), ioe);
+ } finally {
+ IOUtils.closeQuietly(outputStream);
+ }
+
+ String s3Bucket = this.writerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.S3_BUCKET);
+ String s3AccessKey = this.writerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.S3_ACCESS_KEY);
+ String s3SecretKey = this.writerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.S3_SECRET_KEY);
+ String s3Endpoint = this.writerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.S3_ENDPOINT);
+
+ S3Util s3Util = new S3Util(s3Bucket, s3AccessKey, s3SecretKey, s3Endpoint);
+ s3Util.upload(fileFullPath, fileFullPath);
+
+ try{
+ File file = new File(fileFullPath);
+ file.delete();
+ }catch(Exception e){
+ System.out.println("delete file failed");
+ }
+
+ LOG.info("end do write");
+ }
+
+ private String buildFilePath() {
+ boolean isEndWithSeparator = false;
+ switch (IOUtils.DIR_SEPARATOR) {
+ case IOUtils.DIR_SEPARATOR_UNIX:
+ isEndWithSeparator = this.path.endsWith(String
+ .valueOf(IOUtils.DIR_SEPARATOR));
+ break;
+ case IOUtils.DIR_SEPARATOR_WINDOWS:
+ isEndWithSeparator = this.path.endsWith(String
+ .valueOf(IOUtils.DIR_SEPARATOR_WINDOWS));
+ break;
+ default:
+ break;
+ }
+ if (!isEndWithSeparator) {
+ this.path = this.path + IOUtils.DIR_SEPARATOR;
+ }
+ return String.format("%s%s", this.path, this.fileName);
+ }
+
+ @Override
+ public void post() {
+
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+ }
+}
diff --git a/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3WriterErrorCode.java b/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3WriterErrorCode.java
new file mode 100755
index 000000000..3bd817143
--- /dev/null
+++ b/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3WriterErrorCode.java
@@ -0,0 +1,41 @@
+package com.alibaba.datax.plugin.writer.s3writer;
+
+import com.alibaba.datax.common.spi.ErrorCode;
+
+/**
+ * Created by haiwei.luo on 14-9-17.
+ */
+public enum S3WriterErrorCode implements ErrorCode {
+
+ CONFIG_INVALID_EXCEPTION("TxtFileWriter-00", "您的参数配置错误."),
+ REQUIRED_VALUE("TxtFileWriter-01", "您缺失了必须填写的参数值."),
+ ILLEGAL_VALUE("TxtFileWriter-02", "您填写的参数值不合法."),
+ Write_FILE_ERROR("TxtFileWriter-03", "您配置的目标文件在写入时异常."),
+ Write_FILE_IO_ERROR("TxtFileWriter-04", "您配置的文件在写入时出现IO异常."),
+ SECURITY_NOT_ENOUGH("TxtFileWriter-05", "您缺少权限执行相应的文件写入操作.");
+
+ private final String code;
+ private final String description;
+
+ private S3WriterErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Code:[%s], Description:[%s].", this.code,
+ this.description);
+ }
+
+}
diff --git a/s3writer/src/main/resources/plugin.json b/s3writer/src/main/resources/plugin.json
new file mode 100755
index 000000000..5ca1cb7d9
--- /dev/null
+++ b/s3writer/src/main/resources/plugin.json
@@ -0,0 +1,6 @@
+{
+ "name": "s3writer",
+ "class": "com.alibaba.datax.plugin.writer.s3writer.S3Writer",
+ "description": "upload file to aws s3",
+ "developer": "Chyroc"
+}
\ No newline at end of file
diff --git a/s3writer/src/main/resources/plugin_job_template.json b/s3writer/src/main/resources/plugin_job_template.json
new file mode 100644
index 000000000..142e9b386
--- /dev/null
+++ b/s3writer/src/main/resources/plugin_job_template.json
@@ -0,0 +1,15 @@
+{
+ "name": "s3writer",
+ "parameter": {
+ "s3Bucket": "",
+ "s3AccessKey": "",
+ "s3SecretKey": "",
+ "s3Endpoint": "",
+
+ "path": "",
+ "fileName": "",
+ "writeMode": "",
+ "fieldDelimiter":"",
+ "dateFormat": ""
+ }
+}
\ No newline at end of file