澳门太陽城集团 1

【澳门太陽城集团】文件大于10M的JAR和资源文件不能上传到Dataworks,或者mongoSpiltter里拿出conf里的数据

原标题:通过简单瘦身,解决Dataworks 10M文件限制问题

我们的业务是要使用mongodb的Hadoop
driver处理输出。我们重写的mongodbInputFormat的时候传递数据的时候是把数据写入conf,然后再从mongoSplitter里面里面从conf里面读出来。比如下面这样:

摘要:
用户在DataWorks上执行MapReduce作业的时候,文件大于10M的JAR和资源文件不能上传到Dataworks,导致无法使用调度去定期执行MapReduce作业。
解决方案: jar -resources test_mr.

把数据放入数据conf:

用户在DataWorks上执行MapReduce作业的时候,文件大于10M的JAR和资源文件不能上传到Dataworks,导致无法使用调度去定期执行MapReduce作业。

List<Long> tagsUrns =null;
 //tagUrns 赋值…..
 conf.set(“tagUrns”,
            ObjectSerializer.serialize((Serializable) tagsUrns));

解决方案:

在mapper,reduce,或者mongoSpiltter里拿出conf里的数据:

第一步:大于10M的resources通过MaxCompute CLI客户端上传,

List<Long> tagUrns = (List<Long>) ObjectSerializer
            .deserialize(context.getConfiguration().get(“tagUrns”));

客户端下载地址:

由于conf只能放入boolean、int、string的值,而我需要给hadoop
Configuration放入的是list或者其他对象,所以需要用到一个序列化工具类。

客户端配置AK、EndPoint:

序列化工具类代码:

add jar C:\test_mr\test_mr.jar -f;//添加资源

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

第二步:目前通过MaxCompute
CLI上传的资源,在Dataworks左侧资源列表是找不到的,只能通过list
resources查看确认资源;

import java.io.*;

list resources;//查看资源

public class ObjectSerializer {

第三步:瘦身Jar,因为Dataworks执行MR作业的时候,一定要本地执行,所以保留个main就可以;

private static final Log log =
LogFactory.getLog(ObjectSerializer.class);

澳门太陽城集团 1

public static String serialize(Serializable obj) throws IOException {
    if (obj == null)
        return “”;
    try {
        ByteArrayOutputStream serialObj = new ByteArrayOutputStream();
        ObjectOutputStream objStream = new
ObjectOutputStream(serialObj);
        objStream.writeObject(obj);
        objStream.close();
        return encodeBytes(serialObj.toByteArray());
    } catch (Exception e) {
        throw new IOException(“Serialization error: ” + e.getMessage(),
e);
    }
}

通过上述方法,我们可以在Dataworks上跑大于10M的MR作业。

public static Object deserialize(String str) throws IOException {
    if (str == null || str.length() == 0)
        return null;
    try {
        ByteArrayInputStream serialObj = new ByteArrayInputStream(
                decodeBytes(str));
        ObjectInputStream objStream = new
ObjectInputStream(serialObj);
        return objStream.readObject();
    } catch (Exception e) {
        throw new IOException(“Deserialization error: ” +
e.getMessage(), e);
    }
}

作者:隐林

public static String encodeBytes(byte[] bytes) {
    StringBuffer strBuf = new StringBuffer();

​本文为云栖社区原创内容,未经允许不得转载。返回搜狐,查看更多

    for (int i = 0; i < bytes.length; i++) {
        strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ((int)
‘a’)));
        strBuf.append((char) (((bytes[i]) & 0xF) + ((int) ‘a’)));
    }

责任编辑:

    return strBuf.toString();
}

public static byte[] decodeBytes(String str) {
    byte[] bytes = new byte[str.length() / 2];
    for (int i = 0; i < str.length(); i += 2) {
        char c = str.charAt(i);
        bytes[i / 2] = (byte) ((c – ‘a’) << 4);
        c = str.charAt(i + 1);
        bytes[i / 2] += (c – ‘a’);
    }
    return bytes;
}

}

相关文章