• 坚守一条红线 维护生态安全(在习近平新时代中国特色社会主义思想指引下——新时代新作为新篇章) 2019-11-12
  • 《辉煌中国》第五集《共享小康》 2019-11-12
  • 这事咋办No.2丨申请西安保障性住房必看 花生让你有房住 2019-11-11
  • 预热世界杯 玩法各异!3张竞彩红单了解一下 2019-10-18
  • 崇拜不劳而获是腐败的根源之一,正气不足是腐败的第二个根源,沉迷于初级趣味易滋生腐败,提高素质力争不想腐,以医者之心防治腐败。 2019-10-09
  • 人民日报评论员随笔:让文化热情涵养更多经典 2019-10-09
  • 国产手机跟进“刘海屏”,凸显苹果在智能手机市场的影响力 2019-10-05
  • 世界杯倒计时:球迷街头狂欢为自己的国家打Call 2019-10-02
  • 紫光阁中共中央国家机关工作委员会 2019-09-24
  • 洪崖洞客流激增 渝中开通2条应急通道12辆公交车驰援 2019-09-24
  • 中央环保督察“回头看”10省区问责630人 2019-09-19
  • 天津举办改善营商环境专题讲座 2019-09-19
  • 日本核能行业誓言日本将在今年内重新启用核电 2019-09-03
  • 中国保险行业协会发布汽车后市场配件合车标准 2019-09-02
  • 晋中市通报五起违反中央八项规定精神问题 2019-08-30
  • 当前位置 > CPDA数据分析师 > “数”业专攻 > Hadoop动态调整Map Task内存资源大小

    四川快乐12直选遗漏走势图:Hadoop动态调整Map Task内存资源大小

    浙江快乐彩和值走势图 www.pn-vs.com 来源:数据分析师 CPDA | 时间:2015-11-26 | 作者:admin

    前言

    我们都知道,在Hadoop中,一个Job的执行需要转化成1个个的Task去执行,在Task中,有会有2个类型,一个为Map Task,另一个就是Reduce Task.当然,这不是最底层的级别,在Task内部,还可以再分为TaskAttempt,叫做任务尝试,任务尝试姑且不在本篇文章的论述范围内.OK,针对每个Task,他当然会有他的资源使用量,广义的来讲,资源分为2个概念,1个是Memory 内存,另一个是Vcores,虚拟核数.这些资源的分配情况非常的关键,因为资源分少了,可能空闲集群资源浪费了,又可能会导致oom内存不够用的问题,假设你内存分小了,既然这样,那我们把资源调大了是不就行了,当然会导致一个狼多羊少的问题,毕竟资源有限,你用的多了,别人就会用的少了.所以这里就会衍生出一个问题,对于Job中的每个Task,我该如何去设置可使用的资源量呢,采用默认统一的map.memory.mb这样的配置显然不是一个好的解决办法,其实让人可以马上联想到的办法就是能够数据量的大小动态调整分配的资源量,这无疑是最棒的方案,下面就来简单的聊聊这个方案.

    资源调配的指标

    什么是资源调配的指标,通俗的讲就是一个资源分配的参考值,依照这个值,我可以进行资源的动态分配,这也是非常符合正常逻辑思维的方式的.在这里的标准值就是处理数据量的大小,所以调整的目标对象就是Map Task而不是Reduce Task.那么这个数据可以怎么拿到呢,稍微了解过Hadoop的人一定知道map的过程是如何拿到数据的,简单的就是从inputSplit中拿到数据,而这个inputSplit当然会被保留在Map Task中.就是下面所示的代码中:

    @SuppressWarnings("rawtypes")
    public class MapTaskAttemptImpl extends TaskAttemptImpl {
      private final TaskSplitMetaInfo splitInfo;
      public MapTaskAttemptImpl(TaskId taskId, int attempt, 
      EventHandler eventHandler, Path jobFile, 
      int partition, TaskSplitMetaInfo splitInfo, JobConf conf,
      TaskAttemptListener taskAttemptListener, 
      Token<JobTokenIdentifier> jobToken,
      Credentials credentials, Clock clock,
      AppContext appContext) {
    super(taskId, attempt, eventHandler, 
    taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(),
    jobToken, credentials, clock, appContext);
    this.splitInfo = splitInfo;
      }
    

    在这个TaskSplitMetaInfo中就会有输入数据长度的一个变量

    /**
       * This represents the meta information about the task split that the 
       * JobTracker creates
       */
      public static class TaskSplitMetaInfo {
        private TaskSplitIndex splitIndex;
        private long inputDataLength;
        private String[] locations;
        public TaskSplitMetaInfo(){
          this.splitIndex = new TaskSplitIndex();
          this.locations = new String[0];
        }
    ...

    后面我们就用到这个关键变量.

    需要调整的资源变量

    上文中已经提过,目标资源调整的维度有2个,1个是内存,还有1个核数,数据量的大小一般会直接与内存大小相关联,核数偏重于处理速度,所以我们应该调整的task的内存大小,也就是map.memory.mb这个配置项的值.这个配置项的默认值是1024M,就是下面这个配置:

    public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
      public static final int DEFAULT_MAP_MEMORY_MB = 1024;

    如果你在配置文件中没配的话,他走的就是默认值,个人感觉这个值还是有点偏大的,如果一个Job一不小心起了1000多个Task,那么1T的内存就用掉了.OK,下面看下这个变量是保存在哪个变量中的呢,进入到TaskAttemptImpl中你就可以发现了.

    /**
     * Implementation of TaskAttempt interface.
     */
    @SuppressWarnings({ "rawtypes" })
    public abstract class TaskAttemptImpl implements
        org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
          EventHandler<TaskAttemptEvent> {
    
      static final Counters EMPTY_COUNTERS = new Counters();
      private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
      private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
      private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
    
      protected final JobConf conf;
      protected final Path jobFile;
      protected final int partition;
      protected EventHandler eventHandler;
      private final TaskAttemptId attemptId;
      private final Clock clock;
      private final org.apache.hadoop.mapred.JobID oldJobId;
      private final TaskAttemptListener taskAttemptListener;
      private final Resource resourceCapability;
      protected Set<String> dataLocalHosts;
    ...

    就是上面这个resourceCapability,在这里就会有核数和内存2个资源指标.

    @Public
    @Stable
    public static Resource newInstance(int memory, int vCores) {
      Resource resource = Records.newRecord(Resource.class);
      resource.setMemory(memory);
      resource.setVirtualCores(vCores);
      return resource;
    }
    
    /**
     * Get <em>memory</em> of the resource.
     * @return <em>memory</em> of the resource
     */
    @Public
    @Stable
    public abstract int getMemory();
      
    /**
     * Set <em>memory</em> of the resource.
     * @param memory <em>memory</em> of the resource
     */
    @Public
    @Stable
    public abstract void setMemory(int memory);
    

    到时就可以在这边进行设置.

    动态调整Map TaskAttempt内存大小

    刚刚上文中已经提到过,想要根据处理数据量的大小来调整Map的内存大小,首先你要有一个标准值,比如1个G的数据量对应1个G的内存值,然后如果你这次来了512M的数据,那么我就配512/1024(就是1个G)*1024M=512M,所以我最后分的内存就是512M,如果数据量大了,同理.这个标准值当然要做出可配,有使用方来决定.姑且用下面新的配置值来定义:

    public static final String MAP_MEMORY_MB_AUTOSET_ENABLED = "map.memory-autoset.enabled";
      public static final String DEFAULT_MEMORY_MB_AUTOSET_ENABLED = "false";
    
      public static final String MAP_UNIT_INPUT_LENGTH = "map.unit-input.length";
      public static final int DEFAULT_MAP_UNIT_INPUT_LENGTH = 1024 * 1024 * 1024;

    然后在Map TaskAttempt中加上动态调整方法,如果你开启了此项新功能,则会执行方法中的部分操作.

    public class MapTaskAttemptImpl extends TaskAttemptImpl {
      private final TaskSplitMetaInfo splitInfo;
      public MapTaskAttemptImpl(TaskId taskId, int attempt, 
      EventHandler eventHandler, Path jobFile, 
      int partition, TaskSplitMetaInfo splitInfo, JobConf conf,
      TaskAttemptListener taskAttemptListener, 
      Token<JobTokenIdentifier> jobToken,
      Credentials credentials, Clock clock,
      AppContext appContext) {
    super(taskId, attempt, eventHandler, 
    taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(),
    jobToken, credentials, clock, appContext);
    this.splitInfo = splitInfo;
    autoSetMemorySize();
      }
    ...
      private void autoSetMemorySize() {
    int memory;
    int unitInputLength;
    int unitMemorySize;
    boolean isMemoryAutoSetEnabled;
    Resource resourceCapacity;
    isMemoryAutoSetEnabled =
    Boolean.parseBoolean(conf.get(
    MRJobConfig.MAP_MEMORY_MB_AUTOSET_ENABLED,
    MRJobConfig.DEFAULT_MEMORY_MB_AUTOSET_ENABLED));
    //判断是否开启动态调整内存功能
    if (isMemoryAutoSetEnabled) {
      unitInputLength =
      conf.getInt(MRJobConfig.MAP_UNIT_INPUT_LENGTH,
      MRJobConfig.DEFAULT_MAP_UNIT_INPUT_LENGTH);
      unitMemorySize =
      conf.getInt(MRJobConfig.MAP_MEMORY_MB,
      MRJobConfig.DEFAULT_MAP_MEMORY_MB);
      memory =
      (int) (Math.ceil(1.0 * splitInfo.getInputDataLength()
      / unitInputLength) * unitMemorySize);
    } else {
      memory =
      conf.getInt(MRJobConfig.MAP_MEMORY_MB,
      MRJobConfig.DEFAULT_MAP_MEMORY_MB);
    }
    //调整内存资源量
    resourceCapacity = getResourceCapability();
    resourceCapacity.setMemory(memory);
      }
    

    在这里我做了特别处理,为了使分配的内存大小符合2的幂次方,我用了向上取整的方法算倍数,这样规范化一些.下面是单元测试案例

    @Test
    public void testMapTaskAttemptMemoryAutoSet() throws Exception {
      int memorySize;
      int adjustedMemorySize;
      Resource resourceCapacity;
    
      EventHandler eventHandler = mock(EventHandler.class);
      String[] hosts = new String[3];
      hosts[0] = "host1";
      hosts[1] = "host2";
      hosts[2] = "host3";
      TaskSplitMetaInfo splitInfo =
          new TaskSplitMetaInfo(hosts, 0, 2 * 1024 * 1024 * 1024l);
    
      TaskAttemptImpl mockTaskAttempt =
          createMapTaskAttemptImplForTest(eventHandler, splitInfo);
    
      resourceCapacity = mockTaskAttempt.getResourceCapability();
      memorySize = resourceCapacity.getMemory();
    
      // Disable the auto-set memorySize function
      // memorySize will be equal to default size
      assertEquals(MRJobConfig.DEFAULT_MAP_MEMORY_MB, memorySize);
    
      // Enable the auto-set memory function
      Clock clock = new SystemClock();
      ApplicationId appId = ApplicationId.newInstance(1, 1);
      JobId jobId = MRBuilderUtils.newJobId(appId, 1);
      TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
      TaskAttemptListener taListener = mock(TaskAttemptListener.class);
      Path jobFile = mock(Path.class);
      JobConf jobConf = new JobConf();
      jobConf.set(MRJobConfig.MAP_MEMORY_MB_AUTOSET_ENABLED, "true");
      jobConf.set(MRJobConfig.MAP_MEMORY_MB,
          String.valueOf(MRJobConfig.DEFAULT_MAP_MEMORY_MB));
      jobConf.set(MRJobConfig.MAP_UNIT_INPUT_LENGTH,
          String.valueOf(MRJobConfig.DEFAULT_MAP_UNIT_INPUT_LENGTH));
    
      TaskAttemptImpl taImpl =
          new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, splitInfo,
              jobConf, taListener, null, null, clock, null);
    
      resourceCapacity = taImpl.getResourceCapability();
      memorySize = resourceCapacity.getMemory();
      adjustedMemorySize =
          (int) (Math.ceil(1.0 * splitInfo.getInputDataLength()
              / MRJobConfig.DEFAULT_MAP_UNIT_INPUT_LENGTH) * MRJobConfig.DEFAULT_MAP_MEMORY_MB);
    
      // Enable the auto-set function,the memorySize will be changed
      assertEquals(adjustedMemorySize, memorySize);
    }
    

     

  • 坚守一条红线 维护生态安全(在习近平新时代中国特色社会主义思想指引下——新时代新作为新篇章) 2019-11-12
  • 《辉煌中国》第五集《共享小康》 2019-11-12
  • 这事咋办No.2丨申请西安保障性住房必看 花生让你有房住 2019-11-11
  • 预热世界杯 玩法各异!3张竞彩红单了解一下 2019-10-18
  • 崇拜不劳而获是腐败的根源之一,正气不足是腐败的第二个根源,沉迷于初级趣味易滋生腐败,提高素质力争不想腐,以医者之心防治腐败。 2019-10-09
  • 人民日报评论员随笔:让文化热情涵养更多经典 2019-10-09
  • 国产手机跟进“刘海屏”,凸显苹果在智能手机市场的影响力 2019-10-05
  • 世界杯倒计时:球迷街头狂欢为自己的国家打Call 2019-10-02
  • 紫光阁中共中央国家机关工作委员会 2019-09-24
  • 洪崖洞客流激增 渝中开通2条应急通道12辆公交车驰援 2019-09-24
  • 中央环保督察“回头看”10省区问责630人 2019-09-19
  • 天津举办改善营商环境专题讲座 2019-09-19
  • 日本核能行业誓言日本将在今年内重新启用核电 2019-09-03
  • 中国保险行业协会发布汽车后市场配件合车标准 2019-09-02
  • 晋中市通报五起违反中央八项规定精神问题 2019-08-30
  • 体彩6场半全场 pk10大小单双计划软件 jj比赛德州扑克 2019码报全年资料 广西体彩11选5玩法介绍 广东福彩好彩1详情 一定牛买彩票被骗 跳跳乐第十八套完整版视频 电子竞技最大比赛 欧洲足球赛事有哪些 龙虎合app下载 快三赚钱吗 湖南快乐十分钟开奖结果走势图 百人斗牛牛游戏下载 nba直播腾讯无插件