Lemcoden

来自于大数据攻城狮的分享

0%

硬件基础知识

CPU的制作

汇编语言的执行过程

计算机启动过程

操作系统的基本知识

进程线程纤程的基本概念

内存管理

进程与线程管理

中断与系统调用(软中断)

内核同步基础知识

关于硬盘IO DMA

我们接着上次的hive继续总结

配置补充,hiveserer2的高可用

node2-hive-site.xml

Read more »

who,what,why

hive的作用

按照做笔记的习惯来说,说一个新的大数据平台框架,一般先从模型说起,而hive本身是企业级数据仓库工具,基于mapreduce计算引擎的封装(2.x之后逐渐将官方计算引擎指定为spark)所以,就其本身而言并没有模型可以讨论.
但是我们可以聊聊他的作用,他是解决什么需求的:

Read more »

为什么要看源码

1.为了更好的使用框架的Api解决问题,比如说我们遇到一个问题,需要修改mapreduce分片的大小,如果没看过源码,可能会写很多代码,甚至重新调整文件block的大小上传,但是看过源码的都懂,只要简单的修改minSplite和maxSplite这两个配置属性就可以.
2.为了学习框架本身的设计方法,应用到日常开发中.
(此次源码分析的hadoop版本为2.7.2)

怎么看源码

要有目的性的的看源码,如果不带目的直接看的话,会很晕,源码一般信息量很大,而且很多部分是没有必要的,我们要取其精髓,忽略与当前目标无关的部分.并将重要的部分记录下来,最好是自己可以用伪代码实现,并且能够讲出其中的逻辑点和技术应用点

mapreduce源码梗概

mapreduce笔者目前了解的主要有三部分,client端,map计算端的输入输出,reduce计算端的输入输出.
client端主要验证client端的任务,以及关键切片部分的逻辑
map端和reduce端输入输出,一是看数据格式相关的转换
二是看shuffle的主要流程,看有哪些可以在开发过程中可以微调的地方

client端

1
job.waitForCompletion(true);

我们编写mapreduce程序的时候到最后执行这个方法的时候,任务才会真正提交,
那我们提交任务之后客户端都是如何做得呢?
点进去查看源码

1
2
3
4
5
6
7
8
9
10
11
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) { //检查任务状态,是否允许提交
submit(); //提交任务方法
}
if (verbose) {
monitorAndPrintJob(); //监控并且获取任务的详细运行信息
} else ...
return isSuccessful();
}

提交之后,再调用方法获取任务的详细信息,可见这个任务是异步任务.
我们关系心的任务如何提交的,那么就进入submit方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI(); //hadoop1.x和hadoop2.x的mapreduce架构不同,所以这里是新版API
connect(); //连接yarn resourceManager
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); //获取集群HDFS操作对象和Client对象,为以后把分片信息,配置文件,jar包,通过FileSystem上传到hdfs上
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster); //这里就是提交Job的地方
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}

再进入submitJobInternal方法,然后发现注释这部分很有东西

1
2
3
4
5
6
7
Internal method for submitting jobs to the system.
The job submission process involves:
1.Checking the input and output specifications of the job.
2.Computing the InputSplits for the job.
3.Setup the requisite accounting information for the DistributedCache of the job, if necessary.
4.Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system.
5.Submitting the job to the JobTracker and optionally monitoring it's status.

简单的翻译以下就懂了,里面会

  1. 检查job输入输出路径
  2. 计算分片的大小
  3. 有必要的话,为作业的缓存设置账户信息
  4. 把job的jar包,配置文件拷贝到hdfs
  5. 提交job到JobTracker

然后我们再看代码,因为源代码比较多,这里只挑出重要的伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs
checkSpecs(job); //这个就是检查文件路径的方法

...

int maps = writeSplits(job, submitJobDir);//写入切片信息,返回切片数量

...

String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME); // 获取任务队列名,源码中有很多这样的获取配置信息的代码,这里只挑出一个说明一下

...

// Write job file to submit dir
writeConf(conf, submitJobFile); //写入conf文件到hdfs上

...

//真正提交客户端的方法
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
}

client的用户任务,在这里如何调用的基本了解清除了,我们重点看切片是如何写入的,毕竟这是hadoop生态的一个要点:如何通过分片实现计算向数据移动的,点开writeSplite方法

1
2
3
4
5
6
7
8
9
10
11
12
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir); //hadoop2.x使用newApi,所以进入这个方法
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
1
2
3
4
5
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
List<InputSplit> splits = input.getSplits(job);//不解释,再进入getSpiltes方法
}

进入之后,发现是InputFormat是个接口,有多个子类,那怎么办?查看子类,有 DB数据库的,有Line管每行记录的,切片当然是以文件系统为依托,所以选FileInputFormat
点进去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public List<InputSplit> getSplits(JobContext job) throws IOException {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
...
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
...
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}

return splites;
}

最后终于找到了关于切片的代码,首先开头有两个值minSize和maxSize,分别进入这些方法,发现默认的是0,和long型的最大值,并且受SPLIT_MINSIZE(mapreduce.input.fileinputformat.split.minsize)和SPLIT_MAXSIZE(mapreduce.input.fileinputformat.split.maxsize)这两个配置变量控制,然后继续往下走,有个cpmputeSpliteSize方法,用到了minSize和maxSize还有BlockSize,进入之后我们总算知道了切片如何计算大小

1
Math.max(minSize, Math.min(maxSize, blockSize))

它的语义就是以minSize为最小边界,maxSize为最大边界
如果blockSize没有超过最大最小边界,则SpliteSize取BlockSize的值
如果超过边界则取边界值.


继续追getSplites的代码
有一个getBlockIndex方法,获取块索引,并且块索引最后会放到切片信息中,
我们进入这个方法发现

1
2
3
4
5
6
7
8
9
10
protected int getBlockIndex(BlockLocation[] blkLocations,
long offset) {
for (int i = 0 ; i < blkLocations.length; i++) {
// is the offset inside this block?
if ((blkLocations[i].getOffset() <= offset) &&
(offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
return i;
}
}
}

注意if判断方法,语义就是如何取得切片的block索引,就是对切片在文件中偏移量,做一次”向下取整”,比如说第二block块的偏移量是50,而第二个切片的偏移量是75,位于第二个和第三个块(100)偏移量之间,也就是说,在真正进行计算的时候,会从块的第50的偏移量读取,

这就是为什么我们一般把分片大小设置为块大小的倍数,因为这样可以避免交叉读写.

最后就是写入分片信息包括分片的hosts,size,filepath,offset

有了这些信息,就可以支持日后的计算能够保证计算程序找到分片的位置,也就是支持计算向数据移动

map端源码

首先明确目的,我们Map端的源码是对输入输出进行分析,主要分析map两端的输入输出,

输入

已知我们Map是通过MapTask类运行的,那么我们就先进入MapTask类,先找run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {


if (conf.getNumReduceTasks() == 0) {
mapPhase = getProgress().addPhase("map", 1.0f);
} else {
// If there are reducers then the entire attempt's progress will be
// split between the map phase (67%) and the sort phase (33%).
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);
}

...
if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter);
}

首先映入眼帘的是getNumReduceTasks(),获取Reduce的数量,如果数量为零则不进行排序计算,不为排序任务分配全中
然后到下面的runNewMapper点进去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(),
reporter);
// make a mapper
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// make the input format
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);//初始化InputFormat
// rebuild the input split
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());//获取切片信息,保证自己拿到最近的切片数据.
LOG.info("Processing split: " + split);

org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext);

job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.RecordWriter output = null;

// get an output object
if (job.getNumReduceTasks() == 0) {
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}

org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output,
committer,
reporter, split);

org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);

try {
input.initialize(split, mapperContext);
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}

我们先看下面try cacth里面的东西,一般看源码,try语句块里面的东西是比较重要的,
在try语句块当中,我们看到了,mapper对象通过run方法运行我们开发编写的map方法,
并且且有自己的输入输出.
然后从头开始捋,首先通过反射将我们编写的Map对象赋值给Mapper,中间注释的跳过,直接开门见山,我们看一下Mapper的输入类,进入到NewTrackingRecordReader

1
2
3
4
5
6
7
8
9
10
NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
TaskReporter reporter,
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
...
throws InterruptedException, IOException {
this.real = inputFormat.createRecordReader(split, taskContext);
...
}

进入后发现这是个包装类,有nextKeyalue方法获取我们Map中所需要的键值对,而他调用的是real对象的nextKeyValue,
而real对象就是我们的LineRecordReader类型.进入有一个初始化类initialize

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();

// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
...
fileIn.seek(start);
...
if (start != 0) {
start += in.readLine(new Text(), 0,maxBytesToConsume(start));
}
this.pos = start;
}

通过seek方法获从自己相应的切片偏移量开始读取信息,
最后一个判断是,默认跳过第一条数据的读取,因为切块的原因很有可能第一条信息不完整.然后我们知道,NewTrackingRecordReader在Context对象里而我们的LineRecordReader在NewTrackingRecordReader当中,所以其实最后context对象调用的nextKeyValue其实调用的是LineRecordReader的方法

1
public boolean nextKeyValue() throws IOException {
key.set(pos);
1
2
}

而在nextKeyValue里面有一个key.set(pos)其实就是文件的行号赋值给key

输出

好了,输入看完了我们再看一下输出,重新回到MapTask类,我们点开输出类,NewOutputCollector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
collector = createSortingCollector(job, reporter);//获取排序的数据收集器
partitions = jobContext.getNumReduceTasks();//根据reduce数量进行分区,如果分区数量等于1使用默认的分区器将数据分区为一
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
}

代码的中文注释已经比较详细了,我们继续走,看看排序收集器里面都是什么东西

打开createSortingCollector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
createSortingCollector(JobConf job, TaskReporter reporter) {
Class<?>[] collectorClasses = job.getClasses(
JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
int remainingCollectors = collectorClasses.length;
Exception lastException = null;
for (Class clazz : collectorClasses) {
try {
if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
throw new IOException("Invalid output collector class: " + clazz.getName() +
" (does not implement MapOutputCollector)");
}
Class<? extends MapOutputCollector> subclazz =
clazz.asSubclass(MapOutputCollector.class);
LOG.debug("Trying map output collector class: " + subclazz.getName());
MapOutputCollector<KEY, VALUE> collector =
ReflectionUtils.newInstance(subclazz, job);
collector.init(context);
LOG.info("Map output collector class = " + collector.getClass().getName());
return collector;
}
}

最后知道了,输出数据的排序收集器就是唤醒缓冲区MapOutputBuffer的子类,打开他的init方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public void init(MapOutputCollector.Context context
) throws IOException, ClassNotFoundException {


...
//sanity checks
final float spillper =
job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
...
sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
QuickSort.class, IndexedSorter.class), job);

...
// k/v serialization
comparator = job.getOutputKeyComparator();
keyClass = (Class<K>)job.getMapOutputKeyClass();
valClass = (Class<V>)job.getMapOutputValueClass();
serializationFactory = new SerializationFactory(job);
keySerializer = serializationFactory.getSerializer(keyClass);
keySerializer.open(bb);
valSerializer = serializationFactory.getSerializer(valClass);
valSerializer.open(bb);

...
// compression
if (job.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass =
job.getMapOutputCompressorClass(DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
} else {
codec = null;
}
...
if (combinerRunner != null) {
final Counters.Counter combineOutputCounter =
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
} else {
combineCollector = null;
}
...
spillThread.setDaemon(true);
spillThread.setName("SpillThread");
spillLock.lock();
try {
spillThread.start();
while (!spillThreadRunning) {
spillDone.await();
}
} catch (InterruptedException e) {
throw new IOException("Spill thread failed to initialize", e);
} finally {
spillLock.unlock();
}
if (sortSpillException != null) {
throw new IOException("Spill thread failed to initialize",
sortSpillException);
}
}

下面的粗略的的分组,代码分别是

  • 设置缓冲取大小和溢写百分比(默认100M和0.8)
  • 设置缓冲区数据的排序类(默认快速排序)
  • 获取排序比较器(优先获取设置的比较类,没有的话取Writable类型默认的比较器)
  • 将keyvalue键值对序列化
  • 判断是否启用combiner,如果溢写的小文件数量超过3,则启用combiner合并
  • 获取压缩对象
  • 开启溢写线程

这里多嘴几句,因为篇幅有限,先写下buffer的一些特性,以后可以在这个类的源码中验证:

  • buffer本质还是字节数组
  • buffer有赤道的概念,即分界点,一边输入数据,一边输入索引
  • 索引:固定宽度:16字节,4个int(partition,keystart,valuestart,valuelenth)
  • combiner默认发生在溢写之前,排序之后

reduce端源码

我们先看Reducer类的注释,头注释翻译过来大概意思就是

reducer主要做两件事,一件是拉取shuffle的数据,一件是对数据进行sort,这里的排序不是对数据进行在排序,因为map已经对数据进行过排序了,这里是对map排序过的数据文件进行归并.
好了要点说完了,我们直接看ReudceTask的run方法
其中有一句代码是这样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    RawKeyValueIterator rIter = null;
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
shuffleConsumerPlugin.init(shuffleContext);
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
shuffleConsumerPlugin.init(shuffleContext);

rIter = shuffleConsumerPlugin.run();

最后一句,通过shfulle插件获取迭代器,我们知道基本reduce的数据都是通过迭代器获取的

迭代器的使用

为什么使用迭代器呢?因为我不可能将数据一次性的装进内存里,最好是通过迭代器维护一个对文件的指针,这样不仅遍历和实现分离,而且谁想要读取这个文件只要生成一个迭代器,维护自己的指针就可以,不会出现强指针或者同一个数据文件占多份内存的情况.
然后我们追踪Reducer的的迭代器类,追踪路径如下
context.getValues().iterator(); -> ReudceContext -> ReduceContextImpl
进入reduce实现类之后最后有一个getValues方法

1
2
3
4
public
Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
return iterable;
}

他返回一个Iterable对象,而Iterable只有一个方法,返回iterator对象

1
2
3
4
5
private ValueIterator iterator = new ValueIterator();
@Override
public Iterator<VALUEIN> iterator() {
return iterator;
}

而iterator绝对有hasNext对象和next方法(迭代器模式常识)
我们看一下他的hasNext方法

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public boolean hasNext() {
try {
if (inReset && backupStore.hasNext()) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("hasNext failed", e);
}
return firstValue || nextKeyIsSame;
}

最后有一个boolean值nextKeyIsSame,我们先记住它然后我们看ReducerContextImpl的另一个方法nextkey方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public boolean nextKey() throws IOException,InterruptedException {
while (hasMore && nextKeyIsSame) {
nextKeyValue();
}
if (hasMore) {
if (inputKeyCounter != null) {
inputKeyCounter.increment(1);
}
return nextKeyValue();
} else {
return false;
}
}

最后nextKey方法会调用nextkeyValue,这里只给出nextKeyValue的最后一句代码

1
2
3
4
5
6
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
currentRawKey.getLength(),
nextKey.getData(),
nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition()
) == 0;

他会他通过判断器进行判断,下一个key是否和现在的key相等,把结果值赋值给nextKeyIsSame,对就是刚刚记住的nextKeyIsSame.
也就是说,我们调用的values.hasNext方法,会判断nextKeyIsSame,下一个key是否相同,不同则返回false,触发reducor再次重新调用reduce方法.

比较器的使用

我们再看一下,ReducerTask的run方法,这里给出关键点

1
2
3
4
5
6
7
8
9
10
11
RawComparator comparator = job.getOutputValueGroupingComparator();

public RawComparator getOutputValueGroupingComparator() {
Class<? extends RawComparator> theClass = getClass(
JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
if (theClass == null) {
return getOutputKeyComparator();
}

return ReflectionUtils.newInstance(theClass, this);
}

这个是获取分组比较器的方法,优先使用用户的分组比较器,如果用户的分组比较器为null,则使用默认的key的Writable类型包含的比较器.
而reduce也有排序比较器通过getOutputKeyComparator()获取,
再加上map的排序比较器,我们有三个比较器可以自定义也可取默认的比较器,mapreduce给了我们很灵活的选择取加工数据.

背景,

为了体现分布式计算的优点,将数据分而治之再进行相应方面的计算.hadoop提出了mapreduce计算模型

计算模型

计算模型

  • map 端负责将拆分出来的数据进行映射,变换,过滤.体现在一进N出
  • reduce 端负责将数据整合归纳,缩小,分解,一般是一组数据进N出
  • 不管是map还是reduce处理的数据结构基本都是<key,value>的形式划分的
  • 最基本的数据格式确定后,会有数据迁移更加细致的流程
  • 首先分布式计算是基于分布式文件系统的,而分布式文件系统的存储模型以块为单位,所以分布式的物理模型以split(分片)为的单位
  • 默认每个split对应一个map进程
  • split的数据对应map计算之后并不会直接写入磁盘而是先写入环形缓冲区 || 因为每一次IO都会调用linux内核,所以不是一条记录IO一次,而是缓冲区写满后进行一次性IO
  • 跳过中间阶段,看reduce,reduce会根据之前数据的partion数量对应开启reduce进程.
  • 默认一个reduce进程对应一个partition,再次体现分而治之的理念
  • map段的数据经过buffer之后会为reduce分区作准备,所以会先进行分区,对key进行取模操作划分出partition,会将数据结构转换成<key,value,partition>的形式.
  • 进行partition之后,为减少reduce的拉取IO操作(总不能一条数据拉取查找一次吧),会将partition按照进行再次分片(split).
  • 数据进行分片之后,再按照partition进行小文件排序(sort),同时还会进行key的第二次排序,关于为什么还会进行key的排序,到reduce端会解释

    架构设计

    遥远的hadoop1.x

    1
    (ps:因为找不到合适的图,就分开为client端和job端的架构)
    计算模型出现后,就需要搭建整体的框架,首先我们说我们的主要角色有client,JobTracker,TaskTracker
    clinet架构

我们client端主要做四件事:

  • 会根据每次的计算数据,咨询NN元数据(block) => 算出spilt切片清单
  • 生成计算程序未来运行相关的配置文件
  • 将jar包,split的切片清单,配置文件上传到HDFS目录当中
  • cli调用jobTracker,通知启动一个计算程序,并且告知文件都放在了hdfs的哪些地方

job架构

  • jobTracker会根据cli提交的信息,去HDFS上寻找Jar包程序,split清单,以及配置文件
  • 根据拿到的切片清单和配置文件,以及收到的TaskTracker汇报的资源,最终确定每一个spilt应该去往哪个个节点
  • TaskTracker会在汇报心跳的时候拿到分配给自己的人物信息
  • TaskTrakcer取回任务后会从hdfs下载jar包,xml配置文件到本机
  • TaskTraker会根据xml配置文件以及JobTrakcer的任务描述,从jar包中抽取出mapreduce任务运行

这个是hadoop1.x的mapreduce的任务调度,到了hadoop2.x的时候这种架构被重新修改,why?

  1. 任务调度框架jobTracker和TaskTracker使用的是主从架构,那必将出现两个问题,一个是单点故障问题
  2. 另一个则是主节点压力过大的问题
  3. JobTracker同时负责资源的调度以及计算任务管理,两者耦合,如果引入新的计算框架则不能复用资源管理

    改进后的Hadoop2.x

    hadoop2.x后将JobTraker的资源调度功能抽出,封装为Yarn资源管理框架,并配置了高可用.
    hadoop2.x的计算与资源管理架构如下
    job架构
    主要角色有client,ResourceManager,NodeManager,ApplicatioMaster以及Container
  • client与之前的流程一致
  • client会将job提交到ResourceManager
  • ResourceManger接收到job请求后,会在集群当中挑一台不忙的节点,在NodeManager中启动一个ApplicationMaster进程
  • ApplicationMaster进程启动之后,会去HDFS下载Splite清单以及配置文件,并将配置清单发送ResouceManager,申请Container
  • ResouceManager会根据清单计算出使用多少资源,并将根据现有资源通知NodeManager启动相应的Container容器
  • Container向App Mstr(Application Master)反向注册,此时App Mstr才知道有多少Container可以执行任务
  • App Mstr会向Container发送Map Task消息.
  • Container受到消息后,会从hdfs下载jar包,并通过反射取出对象执行MapReduce任务
    相较于Hadoop1.x,2.x的框架很好的解决的1.x框架出现的问题:
  1. 单点故障
  • App Mstr由ResouceManager监控管理,所以当App Mstr没有心跳时,RM(Resource Manager)会触发失败重试机制,ResourceManager会在其他节点重新启动个App Mstr
  • ResourceManager本身可以配置高可用
  • Container 也会有失败重试
  1. 压力过大问题
  • yarn中每个计算程序自有一个AppMaster,每个AppMaster之负责自己计算程序的任务调度.
  • AppMasters是在不同的节点中启动的,默认有了负载的光环
  1. 资源管理与任务调度耦合
  • yarn只负责资源管理,不负责具体的任务调度
  • yarn作为资源管理框架可以被其他计算程序复用(只需要继承AppMaster类就可以)

基础设施

  • 网卡静态IP
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    ifconfig 查看网卡信息
    vim /etc/udev/rules.d/70-persistent-ipoib.rules
    ACTION=="add", SUBSYSTEM=="net", DRIVERS=="?*", ATTR{type}=="32", ATTR{address}=="?*00:02:c9:03:00:31:78:f2", NAME="网卡名"
    vim /etc/sysconfig/network-scripts/ifcfg-网卡名
    POXY_METHOD=none
    BROWSER_ONLY=no
    BOOTPROTO=static //设置静态IP
    DEFROUTE=yes
    NAME=enp0s3
    UUID=290c55a8-1b88-4d99-b741-dcfe455f5c2c
    DEVICE=enp0s3
    ONBOOT=yes
    IPADDR=192.168.0.101 //一般本地IP最后依次增加
    NETMASK=255.255.255.0
    GATWAY=192.168.0.1 //同一集群必须同一网关
  • 设置hosts
    1
    2
    3
    vim /etc/hosts
    192.168.0.101 hadoop01
    192.168.0.101 hadoop02
  • 关闭防火墙
    1
    2
    3
    4
    5
    6
    7
    8
    Centos6.x
    service iptables stop
    service iptables status
    chkconfig iptables off
    Centos7.x
    systemctl stop firewalld.service
    systemctl status firewalld.service
    systemctl disable firewalld.service
  • 关闭 selinux
    1
    2
    vi /etc/selinux/config
    SELINUX=disabled
  • 作时间同步
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    yum install ntp  -y
    vi /etc/ntp.conf
    主节点
    注释掉其他server
    server ntp1.aliyun.com
    server 127.127.1.0
    fudge 127.127.1.0 stratum 10
    从节点
    server 192.168.0.101
    从节点设置crontab同步
    vim /etc/crontab
    10 20 * * * /usr/sbin/ntpdate -u 192.168.0.101
    systemctl start ntpd
    systemctl enable ntpd
  • 安装JDK
    1
    2
    3
    4
    5
    6
    7
    rpm -i   jdk-8u181-linux-x64.rpm
    *有一些软件只认:/usr/java/default
    vi /etc/profile
    export JAVA_HOME=/usr/java/default
    export PATH=$PATH:$JAVA_HOME/bin
    source /etc/profile | . /etc/profile

  • 免密登陆
    1
    2
    ssh-keygen -t rsa  //一路回车
    ssh-copy-id 其他节点

    集群安装配置

    1
    tar -zxvf tar包 -C 目录

    java配置

    1
    2
    3
    4
    5
    rpm -i   jdk-8u181-linux-x64.rpm
    *有一些软件只认:/usr/java/default
    vi /etc/profile
    export JAVA_HOME=/usr/java/default
    export PATH=$PATH:$JAVA_HOME/bin

hadooop 配置

NN NN JN ZKFC ZK DN RM NM
node01 * * *
node02 * * * * * *
node03 * * * * *
node04 * * * *

hadoop配置的七个文件:

core-site.xml hdfs-site.xml mapred-site.xml yarn-site.xml

hadoop-env.sh mapred-env.sh slaves

core-site.xml

1
2
3
4
5
6
7
8
9
<property>
<name>fs.defaultFS</name>
<value>hdfs://mycluster</value>
</property>

<property>
<name>ha.zookeeper.quorum</name>
<value>node02:2181,node03:2181,node04:2181</value>
</property>

hdfs-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# 以下是  一对多,逻辑到物理节点的映射
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/var/bigdata/hadoop/local/dfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/var/bigdata/hadoop/local/dfs/data</value>
</property>

<property>
<name>dfs.nameservices</name>
<value>mycluster</value>
</property>
<property>
<name>dfs.ha.namenodes.mycluster</name>
<value>nn1,nn2</value>
</property>
<property>
<name>dfs.namenode.rpc-address.mycluster.nn1</name>
<value>node01:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.mycluster.nn2</name>
<value>node02:8020</value>
</property>
<property>
<name>dfs.namenode.http-address.mycluster.nn1</name>
<value>node01:50070</value>
</property>
<property>
<name>dfs.namenode.http-address.mycluster.nn2</name>
<value>node02:50070</value>
</property>

#以下是JN在哪里启动,数据存那个磁盘
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://node01:8485;node02:8485;node03:8485/mycluster</value>
</property>
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/var/bigdata/hadoop/ha/dfs/jn</value>
</property>

#HA角色切换的代理类和实现方法,我们用的ssh免密
<property>
<name>dfs.client.failover.proxy.provider.mycluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_dsa</value>
</property>

#开启自动化: 启动zkfc
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>

mapred-site.xml

1
2
3
4
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>

yarn-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>node02:2181,node03:2181,node04:2181</value>
</property>

<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>lemcoden_yarn_cluster</value>
</property>

<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>node03</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>node04</value>
</property>

hadoop-env.sh

1
export JAVA_HOME=/usr/java/default

mapred-env.sh

1
export JAVA_HOME=/usr/java/default

slaves

1
2
3
node02
node03
node04

设置环境变量

1
2
3
4
5
vi /etc/profile
export JAVA_HOME=/usr/java/default
export HADOOP_HOME=/opt/bigdata/hadoop-2.6.5
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
source /etc/profile

软件包分发:

1
2
3
4
cd /opt
scp -r ./bigdata/ node02:`pwd`
scp -r ./bigdata/ node03:`pwd`
scp -r ./bigdata/ node04:`pwd`

zookeeper配置:

cp zoo_sanmple.cfg zoo.cfg

zoo.cfg

1
2
3
4
   datadir=/var/bigdata/hadoop/zk
server.1=node02:2888:3888
server.2=node03:2888:3888
server.3=node04:2888:3888

mkdir /var/bigdata/hadoop/zk
配置环境变量:
node02

1
2
3
4
vi /etc/profile
export ZOOKEEPER_HOME=/opt/bigdata/zookeeper-3.4.6
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZOOKEEPER_HOME/bin
. /etc/profile

分发软件包:

1
2
scp -r ./zookeeper-3.4.6  node03:`pwd`
scp -r ./zookeeper-3.4.6 node04:`pwd`

设置myid:

1
2
3
4
5
6
7
8
9
10
node03:
mkdir /var/bigdata/hadoop/zk
echo 2 > /var/bigdata/hadoop/zk/myid
*环境变量
. /etc/profile
node04:
mkdir /var/bigdata/hadoop/zk
echo 3 > /var/bigdata/hadoop/zk/myid
*环境变量
. /etc/profile

集群初始化启动

先启动JN (node1~node3)

1
hadoop-daemon.sh start journalnode

选择一个NN格式化(只在初始化时做一次)

1
hdfs namenode -format

在另一台机器里同步元数据(先启动node1的namenode)

1
hdfs namenode -bootstrapStandby

格式化zk(也是只做一次)

1
hdfs zkfc  -formatZK

启动集群

1
2
3
4
start-dfs.sh
start-yarn.sh
node03-04:
yarn-daemon.sh start recourcemanager

关于HDFS原理在此写个总结
前三点主要围绕分布式文件系统那么多,为什么apache还要开发自己的文件系统
后两点主要围绕hdfs的高可用问题

1.HDFS存储模型

  • 1.hdfs的存储模型第一个核心为block(块),hdfs的所有存储文件都是按照块来进行划分的,每个文件可以有不同的块,但是文件中除了最后一个块,每个块的大小必须相同,这个为了保证可以和hadoop计算框架,相适应能够有一个统一的计算单位,这个统一的计算单位block不是固定的,需要根据具体的I/O特性进行调整.

  • 2.除了围绕块之外存储模型还有一个核心是存储副本(replication),副本可以冗余数据保证系统的可靠性.并多个副本存储在不同主机当中可以增加计算程序与数据在同一集群的概率,提升计算的性能.

    2.HDFS的角色

    HDFS主要角色

    主要角色有两个namenode和datanode,主要功能包括如下:

  • 1.namenode主要维护文件的元数据

  • 2.datanode主要维护负责block的读和写

  • 3.datanode会与namenode维持心跳,并汇报自己持有的block信息和列表

  • 4.clinet向Namenode交互文件元数据.和datanode交互文件blocks数据

次要角色:SecodaryNamenode

要聊SecondaryNamenode就需要先知道Namenode是如何持久化元数据文件的
首先,NameNode维护元数据是在内存中,如果机器突然宕掉,如果不把元数据写在磁盘上,那时没有办法恢复的,元数据会永久丢失

目前持久化有两种方式:

  • 1.通过操作日志文件(EditLog)恢复,也就是每当NameNode有一条对元数据的操作,就会增加一条日志,但是这样的方式有两个缺点,第一随着运行时间的增长,NN的log会变得及大,很浪费磁盘空间,第二点也是运行时间长的话,要恢复需要很长时间,比如这台机器运行了十年,宕掉之后可能机器需要五年的时间恢复.
  • 2.通过内存的快照(fsImage)恢复,就是将某个时间点的内存状态溢写到磁盘上,但这种方式没办法实时的保存,磁盘IO是有瓶颈的,我们不可能隔一两分钟就保存内存的镜像
    而NameNode保存选择了一个折衷的方式来规避两者明显的缺点:
    每相隔一个时间点,将Editlog里面的操作写入到fsImage当中,合并成为一个新的fsImage,然后将在之前产生旧EditLog,只保留最近fsImage时间点之后产生EditLog删掉,这样让fsImage滚动更新的方法,使得占少量的磁盘的情况下,能让NN恢复到机器宕掉那一刻的状态.
    但是如果把这个保存合并快照的工作全都的交给Namenode的话,此节点的压力会很大,所以关于fsImage的滚动合并工作便交给另一个工作节点SecondaryNamenode来执行

    3.角色交互产生的机制

    安全模式

    每次NN启动的时候,都会将最近时间点的fsImage加载进来,然后将EditLog操作合并到系统内存当中,最后将新的fsImage写入,并删除EditLog
    NN会文件的属性作为元数据,但不包括datanode列表,
    主要是为了防止

    NN重启后,DN(DataNode)列表中,有启动不起来DN,此时正好有客户端请求,NN返回了不可靠的DN列表

    这样的情况发生
    所以NN,在重启后,会接受所有DN心跳信号和状态报告.
    当确定NameNode检测某个数据块的副本达到最小值,那么他会被认为副本安全的,当有一定百分比的数据被确认是安全后,NameNode将会退出安全模式.

    副本放置策略

    第一个副本放置在本机,第二个副本放在不同机架,第三副本放置在同一机架,之后的副本随机.
    原因:
  • 为了能在程序计算的时候找到最近的block数据
  • 为了能在机架损毁的时候仍然能保留其完整的

    HDFS写流程

    读流程
  • Client与NN创建连接
  • NN建立元数据
  • NN验证元数据是否合理
  • NN触发副本放置策略,返回DN列表
  • Clinet将数据块分割为64K数据,并使用chunk(512B)和sumchunk进行填充
  • Clinet向DataNode发送数据块,第一个DN受到packet后本地保存发送到第二个DN
  • 第二个DN发送到第三个DN
  • 当block传输完成,DN分别向各NN汇报,同时Client继续传输下一个block
  • 所以client和block汇报也是并行的
  • 数据分割是为了保证DN可以在第一个数据包发送完成之后,可以立刻发送给第二个DN,保证传输效率以及传输的一致性,并且这样传输,对客户端来说是透明,客户端只要保证给第一个DataNode传输完整数据就可以.

    HDFS读流程

    写流程
  • 为了降低整体的带宽消耗和读延迟,HDFS会尽量让读取程序读取离他最近的副本
  • 如果再读取程序的同一个机架上有一个副本,那么就读该副本
  • 如果一个HDFS集群跨越多个数据中心,那么客户端也将首先读本地数据中心的副本
  • 语义:
       download a file
       Client和NN交互文件元数据获取fileBlockLocation
       NN按距离策略排序返回
       Client尝试下载Block并且校验数据完整性(校验盒校验)
  • 语义:下载一个文件其实是获取文件的所有的Block元数据,那么子集获取block应该成立
      Hdfs支持Client输出文件的offset自定义连接哪些Block的DN,自定义获取数据
      这个是支持计算层的分治,并行计算的核心(牢记)

5.HDFS设置高可用

为什么

为了提升NN的可靠性,如果集群当中只有一个NN,那么在某些情况下Namenode宕机,那么整个集群就不可用了,所以为了提升整个集群的可用性,我们设置两个NN,一主一备,确保主NN宕掉之后,备用NN能启动起来.

开始实现

要想让两个NN能够无缝切换,我们必须先实现两个进程的内存同步,有两种方法进行两个机器的进程同步:

  • 阻塞同步,就是需要同步数据的时候,主NN进入阻塞状态,等待备用NN同步完毕,然后继续运行,但是在现实生产生活中,这种方式显然是行不通的,我们保证了两个NN的强一致性,但是主NameNode的可用性却大大降低了,也就是Namenode同步数据的时候,我们无法使用这个NN.
  • 非阻塞异步,就是主NN需要同步数据时直接发给备用NN,同时NN保持运行接受客户端请求,等待备用NN同步完毕回调通知主NN.但是这样NN的一致性就无法保证

CAP定理

说到这么不得不谈谈CAP定理,即一致性(Consistency),可用性(Avalible),分区容错性(Partition tolerance)三者只可满足其二

  • 分区容错性,当分布式系统中遇到任何网络分区故障时,仍然需要能够保证对外提供满足一致性和可用性的服务,除非整个网络发生故障,通俗一点说,我将数据副本设置到多个节点上,其中一个节点故障了,因为其他节点持有数据副本,仍能对外保持可用一致的服务,我称这个分布式系统具有分区容忍性.
  • 可用性,是指系统提供的服务必须一直处于可用的状态,对于每一个从操作请求总是能够在有限的时间内返回结果.
  • 一致性,指的是数据在多个副本之间能够保持一致的特性,在一致性的需求下,当一个系统在数据一致的状态执行更新操作后,应该保证系统为数据仍然处于一致的状态

    CAP套用分析

    我们套用CAP定理再回顾一下之前NN的同步的两种方法,
  • 两个方法都满足分区容忍性,
  • 第一个阻塞同步方法,会影响可用性,导致系统没办法在系统规定时间内返回正确的响应结果,
  • 第二个异步非阻塞方法,没有办法保证一致性,异步回调没有办法保证备用的NN能完全将数据保持同步一致.

    折衷办法:最终一致性

    既然CAP没办法全部满足,那么我们能不能选择一个折衷的方法呢?
    当然有,NN就是使用这种方法,即通过Poxas选举算法保持数据的最终一致性
    首先,添加一个角色较jounralNode,当NN的命名空间有任何修改时,会告知大部分的JounralNodes进程,standby状态的NN会读取JNs中的信息,并监控EditLog的变化,把变化与应用于自己的命名空间.

    那如何保证数据的一致性呢?
    JounralNode一般会大于等于3的基数个,首先JounralNode根据少数服从多数的原则,选择出其中的Leader,Leader只有一个,它负责记录自己和其他的JounralNode是否接受到NameNode的变更信息,超过半数的JounralNode接收到NN的变更信息时,才承认作数据同步是可靠的.通知备用的NN读取JounralNode的信息.这样我们即保证了可用性,也保证了数据同步的最终一致性(弱一致性)

    zookeeper分布式协调系统自动切换

    上面我们解决的NN之间的数据同步问题,但是现实是active的NN出现故障的时候,我们只能手动其切换NN,所以我们还需要zookeeper的帮助.

    新增角色ZK(zookeeper)和ZKFC(ZookeeperFailOver)

    当我们启动zookeeper进程的时候,会有两个进程监控我们的NN,一个是zookeeper本身的进程,和JounralNode有着相似的选举算法,也是进程数必须超过zookeeper的奇数倍,zkfc要与zk进程保持心跳,而zkfc主要负责监控和切换主备的Namenode

    自动主备切换流程详述

    主备切换
  • 启动两个NN进程,此时两个NN都处于备用的状态
  • 启动zk进程和zkfc进程,当zk进程启动之后,zkfc进程争相在最近的zk进程上建立节点(抢锁),第一个成功建立节点的zkfc进程会将它操控的NN设置为Active,另一个就会被定义为standby
  • 假如zkfc挂掉了,zk进程监控不到zkfc的心跳,会将在zkfc建立的节点删掉,而监控standby的zkfc进程看到zk上的节点消失了,会主动建立节点,并先将active的NN节点降级为standby,自己监控的NN节点升级为active
  • 假如active的NN挂掉了,zkfc进程检测到会删除在zk上建立的节点,而监控standby的zkfc会在zk上建立节点,并测试连接另一个NN是否宕掉,最后将自己监控的NN升级为Active
  • 有一种特殊情况是,Active的NN可以运行可以和DataNode连接,但就是连接不上其他人主机的zkfc,当这个NameNode挂掉的时候,其他zkfc会一直处于阻塞状态,不断的尝试连接挂掉的NN,此时需要检查一下网卡硬件的问题

终端 => 文件管理器

在终端输入

1
nautilus ./

pc与手机链接(GSConnect方式)

主要是因为linux版QQ都是bug,linux也没有微信所以只能通过GSConnect链接手机

来相互传送文件.

Read more »

前情提要:
hexo优化:github+coding双搭建,域名CNAME别称指向

关于域名备案

首先向各位读者道歉,之前向大家推荐了Godaddy的域名注册,笔者发现注册完成之后并不是非常好用,官网难以打开,客服反映慢,并且也不提供备案服务

如果大家像笔者之前的那样注册了Godady的域名,请直接去阿里&腾讯云社区,搜索域名转入,进行相关操作,域名转入需要多交一年的域名租赁费用

如果申请国内阿里,腾讯云的,可以直接去备案,备案需要有域名提供商的云服务器,并且需要填写身份信息,备案申请,快的话一个星期才能申请下来

关于图床

怎么加入图片?

关于怎么在hexo框架中加入图片,百度肯定有很多方法,笔者也尝试过,比如

  • 直接加到github的库当中,然后通过链接引用,但是这样速度很慢,多次打开都是图片坏掉的小图标
  • 使用hexo-asset-image,作为非nodejs的程序员,安装之后,引用图片并没有显示,并且中间还报了各种依赖异常,本人不会解决,所以直接跳过
  • 踩坑之后笔者决定选择用云存储做图床,考虑到成本问题选择七牛云,完全免费

七牛云

关于七牛云的注册,登录,以及申请对象存储,这里笔者不再多赘述,看着官方文档说明都可以做到,这里主要聊一聊申请过程中需要注意的问题

  • 千万不要使用顶级域名绑定,可以在DNSpod域名解析那里添加一个子域名,比如这个

picture_sub_domain

  • 上传图片之前记得添加前缀,多个路径用/隔开

pic_path

  • 多图上传推荐用上传工具PicGo

PicGo的linux安装方法

PicGo的github链接

windows和mac直接在release里面下载相应的exe和dmg就可以

linux推荐下载appImage后缀的安装包

下载完成后,操作如下

1
2
3
cp PicGo-2.3.0-beta.3.AppImage /usr/bin/PicGo
chmod +x /usr/bin/PicGo
PicGo

然后直接命令输入PicGo运行就可以了

如果出现如下异常

1
xclip not found

控制台输入

1
sudo apt install xclip -y

安装xclip即可

这里放一个PicGo的配置参考

conf_pic_go

存储区域华南,华北那些在官方文档有对应的参考映射值

官方配置指南

关于图片加水印

为了防止别人盗图,笔者特意写了一个加水印的脚本,逻辑很简单的一个shell脚本,基于image magic库对图片进行编辑,下面放出地址

https://github.com/Lemcoden/blog.tools

前情提要

jvm的轻量级爽口讲解–内存管理子系统(俗称垃圾回收)〇贰

前言

1
表示博客已经优化到博主比较满意的程度,图片加载问题已解决,jvm系列文章的封面也使用了自己设计的封面,首页菜单添加jvm和blog建站的专栏.虽然说的有点像枯燥的开发日志,但是能看到自己的东西越做越好真是由衷的发自内心的高兴,在这里博主祝大家端午节快乐,来口粽子,来口jvm小菜(恩,没毛病)

上次没吃完的一口粽子之安全点不够用?

上次我们讲了,jvm虚拟机一般是在并发标记回收时,通过设置安全点来实现用户线程的停顿,确切的说是主动式中断的安全点(设置一个轮询问标志,一旦发现论询标志为真时,就跑到附近的安全点去挂起.),但是遇到程序暂停执行的情况就不够用了,比如说线程的sleep和blocked状态,这个时候,他是无法响应程序的中断请求的.

因此我们还需要另一种方式,来让线程中断,这就是安全区域,其实完全就可以理解为被拉成线的安全点,当线程跑到安全区域时,会标识自己已进入,这样回收线程启动,就不会再管安全区域的对象,而在回收线程运行期间,安全区域的线程没有收到回收完毕的信号,是不会离开安全区域的,这就保证了安全区域的安全性(有点套娃)

刚蒸好的新粽子之新老年代如何同时收集?

之前我们介绍了"经典"(这里是新老年代分代,如果把到目前为止可以稳定使用定义为经典的话,需要再加上G1回收器)的回收器,都是新老年代分管,并进行组合使用的.但是那之后的新型回收器都是新老年代同时进行收集的,他们是如何做到呢?
让我们重新再回到理论阶段来解读

我们已知的分代理论出自分带收集理论,当时有两个假说做支撑:

强分代假说:绝大多数对象都是朝生夕灭的

弱分代价说:熬过多次垃圾回收的对象,就越难以消亡.

这和我们现在的垃圾回收流程相对应:

  1. 开始生成的对象会放到eden区当中,等eden区满,触发一次minor GC(有时候也叫young GC),熬过第一次回收的对象会放到survivor区.
  2. 等suvivor区满,再进行一次minor GC,并计算幸存对象熬过GC的次数,将熬过多次(默认15次,可用参数调节)幸存对象放到old区中.
  3. 老年代区域满了收集器触发Full GC,回收整个堆以及方法区内存.

讲到这里大家就要问了,老年代区域满了为什么不直接回收老年代区域的内存?

问的好!话说完全是你自己自问自答啊!!!这是因为可能会出现老年代引用新生代的对象,即出现跨代引用问题.
如果出现跨代引用,我们回收老年代的引用同时,势必要查询到引用到年轻代的对象,因此会连带年轻代的对象同时回收
,所以老一代的回收器都是old区域满了,进行一次FullGC.
(在这里多嘴一句,FullGC很容易和 MajorGCOldGC搞混,原因是CMS回收器之前是没有Old GC这个说法的,Old区满直接Full GC,目前只有CMS收集器能进行所谓的old GC,即只回收老年代的内存,所以CMS收集器出来之后,大家就old区满这个原因,混淆了old GC 和Full GC的说法,而Major GC更加说不清楚,各个资料各有个的说法)
所以就跨代引用问题,在这之后又出现了一个新的假说,叫做

  • 跨代引用假说: 存在相互引用关系的两个对象,是应该倾向于同时生存和同时消亡.

举个栗子,如果新生代存在跨代引用,那么回收的时候,老年代势必会使新生代对象得以存活,那么我们就不必在为了少量的跨带引用去回收老年代,我们只需要在新生代上建立一个全局的数据结构(叫做记忆集),当我们触发minor GC 的时候,只有包含跨带引用的小块内存里的对象才会被加入到GC Roots进行扫描回收.

基于记忆集我们也可以实现老年代的独立GC(CMS收集器).也可以实现新老年代的同时收集(G1收集器)

最后一口肉粽子之经典收集器的控制参数

好的,新的理论假说已经聊清楚了,我们再回到之前讲的经典收集器.之前所介绍的经典收集器分为串行和并行?不!你没讲过(其实就是单核和多核并行)我们按收集器的顺序,再把收集器的调节参数讲一讲

参数 描述 所属收集器
-XX:SuvivorRatio 新生代中Eden区和Suvivor区域的容量比,默认数值为8,Eden:Survivor=8:1 Serial& ParNew&Parallel Scavenge
-XX:PretenureSizeThreshold 设置直接晋升到老年代的老年代的对象大小,设置这个参数后,大于这个参数的对象将在老年代分配,单位字节 所有经典收集器
-XX:MaxTenuringThreshold 设置晋升到老年代的年龄,每躲过一次Minor GC对象的年龄+1,默认此项不设置 所有经典收集器
-XX:+UseAdaptiveSizePolicy 动态调整java堆区域的大小以及进入老年代的年龄还有停顿时间和吞吐量,特别适合新手 Parallel Scavenge
-XX:GCTimeRatio GC时间站总时间的比率,默认为99, 即允许1%的GC时间 Parallel Scavenge
-XX:MaxGCPauseMillis 设置GC最大的停顿时间 Parallel Scavenge
-XX:CMSInitiatingOccupancyFraction 设置CMS老年代的空间被使用多少时进行收集,默认值为68% CMS
-XX:+UseCMSCompactAtFullCollection 完成垃圾回收后是否进行一次内存碎片整理 CMS
-XX:CMSFullGCsBeforeCompact 设置进行多少次垃圾回收之后再启动一次内存碎片整理 CMS

列完参数我们发现,大部分的调节参数集中在Paraellel Scavenge 收集器和CMS收集器当中,那我们再聊聊这俩个收集器

Parallel Scavenge 收集器

Parallel Scavenge 收集器,恩命名方式突然多了一个Scavenge一定有特殊之处,的确如此,Scavenge 收集器提供了两个重要的可控指标给我们,那就是吞吐量和停顿时间,这是之前提到过的,拿第一个来说,

  • 程序的吞吐量=运行用户代码的时间/(运行用户代码的时间+运行垃圾收集的时间)

而在Scavenge收集器当中的参数就是GCTimeRatio 表示GC回收的时间比率,正好和吞吐量取倒,比如说这个参数取19,那么他所占的用户时间比率为1/(1+19),即百分之五

而Scavenge的另一个参数便是,MaxGCPauseMills GC最大的停顿时间,这个没有什么可讲的,单位为毫秒

不过这个参数在使用的时候需要注意,停顿时间和吞吐量的参数设置是矛盾的,如果停顿时间设置比较小,虚拟机为了保持回收效率,会增加回收次数,这样回收次数*回收时间=总的回收时间,如果停顿时间设太短,回收次数将增加很多,其最后的结果势必会使吞吐量也降下来.

UseAdaptiveSizePolicy,这个参数说明的已经很详细,不再多赘述,如果你对收集器不了解,请务必把这项参数打开.

CMS收集器

CMS收集器,是一个跨时代意义的收集器,因为自它开始,之后的收集器都实现了并发标记并发收集的功能,它自己也如此,但需要注意的是即使并发收集也有一些停顿避免不了(初始标记GC Root对象时,也会产生停顿).好了,让我们来看看它的参数
CMSInitiatingOccupancyFraction 既然是并发执行,那么我们不能等老年代区满了再去执行,因为在回收过程中用户线程还在执行,我们需要预留一部分空间给用户线程,所以才有这个参数,jdk5 默认值为68,jdk6默认值为92,jdk6设置有些风险,因为在程序运行过程中,如果回收之后的老年代空间不足以用户使用,会出现"并发失败",这时会临时启用Serial Old收集器,势必会拖慢用户线程的运行.

+UseCMSCompactAtFullCollection 和 CMSFullGCsBeforeCompact 这两个参数同时讲,我们知道CMS收集器是基于标记-清除算法的,这可能会导致老年代没有连续的空间来存储大对象而导致full GC.所以就需要通过这两个参数来设置和整理(两个参数在jdk9开始废弃)

最后简单总结一下问题

  • 安全点有了,为什么需要安全区域?
  • 新生代老年代如何一同收集?
  • 经典收集器的参数?
  • 简单说一下Parallel Scavenge 收集器和CMS收集器的特征.