Perl Perl 多进程实战

scmroad · 发布于 2010年4月22日 · 172 次阅读
96

perl作为一种解释性的语言,很受广大系统管理员的欢迎,好处么就不多说了,坏处也有不少,例如对线程的支持,就一直不咋地,所以大多数情况下,我们都需要多个进程,来帮助我们完成工作,闲话少说,上代码。 #!/usr/bin/perl

test_proc.pl

test multi process

create by lianming: 2009-08-12

use strict; use warnings;

== fork a new process ==

my $pid = fork(); if (!defined($pid)) { print "Error in fork: $!"; exit 1; } if ($pid == 0) { ## == child proc == print "Child: My pid = $$\n"; sleep(5); print "Child: end\n"; exit 0; } else { ## == parent proc == print "Parent My pid = $$, and my child's pid = $pid\n"; sleep(5); print "Parent: end\n"; } exit 0; 执行结果如下: Child: My pid = 19481 Parent My pid = 19480, and my child's pid = 19481 (5秒钟等待) Child: end Parent: end 父进程派生子进程,之需要一条命令,那就是fork,fork函数的返回值赋给一个变量,上例中赋给了"$pid",接下来,就要根据$pid值的不同,来分三种情况了。 1、fork失败的情况:这个时候,$pid处于未定义的状态,上例中做的一个"if (!defined($pid))"的判断,如果为真,说明$pid未定义,fork失败,这个时候就要打印错误信息,并且退出。 2、子进程:如果是子进程,那么$pid的值为0,就是上例中"if ($pid == 0)"条件为真的状况,在"$pid == 0"的时候,那就都是子进程了,上例中,子进程将自己的pid打出来,为19481。 3、父进程:如果是父进程,那么$pid的值为它派生出的子进程的pid,也就是不为0,就是else的情况,上例中把$pid打出来,可以看到,也是 19481,就是子进程的pid值。 这只是一个最简单的例子,一个父进程派生一个子进程,再稍微复杂一点,一个父进程派生多个子进程,代码如下: #!/usr/bin/perl

test_proc_1.pl

test multi process

create by lianming: 2009-08-12

use strict; use warnings;

for (my $i = 0; $i < 10; $i ++) { ## == fork a new process == my $pid = fork(); if (!defined($pid)) { print "Error in fork: $!"; exit 1; } if ($pid == 0) { ## == child proc == print "Child $i : My pid = $$\n"; sleep(5); print "Child $i : end\n"; exit 0; } sleep(1); } exit 0; 这个例子就是,父进程执行一个循环,每次循环都fork一个子进程,子进程执行完以后退出,每次循环都等待1s,循环10次。 执行结果如下: Child 0 : My pid = 20499 Child 1 : My pid = 20500 Child 2 : My pid = 20501 Child 3 : My pid = 20502 Child 4 : My pid = 20503 Child 0 : end Child 5 : My pid = 20506 Child 1 : end Child 6 : My pid = 20507 Child 2 : end Child 7 : My pid = 20508 Child 3 : end Child 8 : My pid = 20509 Child 4 : end Child 9 : My pid = 20510 Child 5 : end [root@localhost /tmp]

Child 6 : end

Child 7 : end Child 8 : end Child 9 : end 每个子进程耗时5s,那么执行完总共需要的是15s。 但是,这样的代码会导致一个问题,在执行的过程中,可以在另外的tty上输入ps auxf来查看当前的进程状态,会发现类似这样的东东: root 20531 0.0 0.0 8460 1704 pts/2 S+ 21:46 0:00 _ perl test_proc_1.pl root 20532 0.0 0.0 0 0 pts/2 Z+ 21:46 0:00 _ [perl] root 20535 0.0 0.0 0 0 pts/2 Z+ 21:46 0:00 _ [perl] root 20536 0.0 0.0 0 0 pts/2 Z+ 21:46 0:00 _ [perl] root 20539 0.0 0.0 0 0 pts/2 Z+ 21:46 0:00 _ [perl] root 20541 0.0 0.0 8460 720 pts/2 S+ 21:46 0:00 _ perl test_proc_1.pl root 20543 0.0 0.0 8460 720 pts/2 S+ 21:46 0:00 _ perl test_proc_1.pl root 20545 0.0 0.0 8460 720 pts/2 S+ 21:46 0:00 _ perl test_proc_1.pl root 20546 0.0 0.0 8460 720 pts/2 S+ 21:46 0:00 _ perl test_proc_1.pl root 20548 0.0 0.0 8460 720 pts/2 S+ 21:46 0:00 _ perl test_proc_1.pl 有4个进程,状态为Z,意思就是僵尸进程,而正常的程序,是不应该出现僵尸进程的。 正常情况下,子进程的退出需要做两件事情,第一,子进程exit,发出一个信号给自己的父进程,第二,父进程对子进程进行回收,如果父进程已经不存在了,那子进程会将init,也就是linux中第一个进程作为自己的父进程,init会代替它的父进程对子进程进行回收。 我们的情况就是,子进程已经调用了exit,但是父进程并没有对它进行回收,如果父进程持续fork子进程,那僵尸进程就会越来越多,越来越多,最后会导致什么后果,我就不说了。 父进程回收子进程的函数有两个: wait,和waitpid wait函数比较简单,没有任何参数,调用以后,父进程会停住,然后等待子进程返回。如果没有子进程,返回-1 waitpid有两个参数,第一个参数为要等待的子进程的pid值,另外一个是flag,一般来讲,第一个参数为-1,意思就是等待所有的子进程。调用方法如下: $procid = fork(); if ($procid == 0) {

== child process ==

print ("this line is printed first\n"); exit(0); } else {

== parent process ==

waitpid ($procid, 0); print ("this line is printed last\n"); } 其实,最主要的是让父进程知道,什么时候才需要去回收已经退出的子进程,因为父进程也是有很多活需要忙的。 这个可以通过信号来实现,子进程在退出的时候,会向父进程发送一个信号,我们只要捕获了这个信号,就知道,有些子进程需要回收啦。例子如下: #!/usr/bin/perl

test_proc_2.pl

test multi process

create by lianming: 2009-08-12

use strict; use warnings; use POSIX ":sys_wait_h";

== number of zombies proc ==

my $zombies = 0; my $collect;

== get the child signal ==

$SIG{CHLD} = sub { $zombies++ };

for (my $i = 0; $i < 10; $i ++) { ## == fork a new process == my $pid = fork(); if (!defined($pid)) { print "Error in fork: $!"; exit 1; } if ($pid == 0) { ## == child proc == print "Child $i : My pid = $$\n"; sleep(5); print "Child $i : end\n"; exit 0; } ## == if need to collect zombies == if ($zombies > 0) { while (($collect = waitpid(-1, WNOHANG)) > 0) { $zombies --; } } sleep(1); } exit 0; 执行结果和原先一样: Child 0 : My pid = 21552 Child 1 : My pid = 21553 Child 2 : My pid = 21554 Child 3 : My pid = 21555 Child 4 : My pid = 21556 Child 0 : end Child 5 : My pid = 21558 Child 1 : end Child 6 : My pid = 21570 Child 2 : end Child 7 : My pid = 21572 Child 3 : end Child 8 : My pid = 21574 Child 4 : end Child 9 : My pid = 21575 Child 5 : end [root@localhost /tmp]

Child 6 : end

Child 7 : end Child 8 : end Child 9 : end 但是ps auxf的结果就有很大差别了: root 21551 0.1 0.0 8280 2672 pts/2 S+ 22:06 0:00 _ perl test_proc_2.pl root 21558 0.0 0.0 8280 1168 pts/2 S+ 22:07 0:00 _ perl test_proc_2.pl root 21570 0.0 0.0 8280 1168 pts/2 S+ 22:07 0:00 _ perl test_proc_2.pl root 21572 0.0 0.0 8280 1168 pts/2 S+ 22:07 0:00 _ perl test_proc_2.pl root 21574 0.0 0.0 8280 1168 pts/2 S+ 22:07 0:00 _ perl test_proc_2.pl root 21575 0.0 0.0 8280 1168 pts/2 S+ 22:07 0:00 _ perl test_proc_2.pl 僵尸进程不会存在了。 $SIG{CHLD} = sub { $zombies++ }; 这条语句,其实就是捕获了子进程退出的时候,向父进程发出的信号,捕获以后,就给一个变量($zombies)加1。 如果"$zombies"不为0的时候,那就说明,有子进程退出了,需要进行回收,那父进程就调用waidpid函数,进行一次回收,每回收一个子进程,就给这个变量减去1,这样当"$zombies"减为0的时候,就说明所有的僵尸进程都已经回收了。bingo! 有的时候,我们只是执行一定量的任务,只管fork就可以了,但是某些时候,我们有太多任务需要执行,要一直持续的fork好多子进程,但是我们希望把子进程的数目控制在一个范围内,比如说,我一个任务,需要有100个子进程来执行,但是我不能100个进程全部fork出去,这样太占用资源了,所以我希望把进程数量控制在10个以内,当第一个进程退出以后,我再fork第11个进程,例子如下: #!/usr/bin/perl

test_proc_3.pl

test multi process

create by lianming: 2009-08-12

use strict; use warnings; use POSIX ":sys_wait_h";

== number of proc ==

my $num_proc = 0;

== number of collected ==

my $num_collect = 0; my $collect;

== get the child signal ==

$SIG{CHLD} = sub { $num_proc-- }; for (my $i = 0; $i < 10; $i ++) { ## == fork a new process == my $pid = fork(); if (!defined($pid)) { print "Error in fork: $!"; exit 1; } if ($pid == 0) { ## == child proc == print "Child $i : My pid = $$\n"; sleep(5); print "Child $i : end\n"; exit 0; } $num_proc ++; ## == if need to collect zombies == if (($i-$num_proc-$num_collect) > 0) { while (($collect = waitpid(-1, WNOHANG)) > 0) { $num_collect ++; } } do { sleep(1); } until ($num_proc < 3); } exit 0; 执行结果如下: Child 0 : My pid = 22641 Child 1 : My pid = 22642 Child 2 : My pid = 22643 Child 0 : end Child 3 : My pid = 22645 Child 1 : end Child 4 : My pid = 22647 Child 2 : end Child 5 : My pid = 22658 Child 3 : end Child 6 : My pid = 22660 Child 4 : end Child 7 : My pid = 22661 Child 5 : end Child 8 : My pid = 22663 Child 6 : end Child 9 : My pid = 22664 Child 7 : end [root@localhost /tmp]

Child 8 : end

Child 9 : end 同时,看到的ps auxf的输出如下: root 22640 0.0 0.0 8116 2672 pts/2 S+ 22:28 0:00 _ perl test_proc_3.pl root 22660 0.0 0.0 0 0 pts/2 Z+ 22:29 0:00 _ [perl] root 22661 0.0 0.0 8116 1168 pts/2 S+ 22:29 0:00 _ perl test_proc_3.pl root 22663 0.0 0.0 8116 1168 pts/2 S+ 22:29 0:00 _ perl test_proc_3.pl root 22664 0.0 0.0 8116 1168 pts/2 S+ 22:29 0:00 _ perl test_proc_3.pl 第一个子进程需要5s才能退出,如果1s执行一次fork的话,那么同时应该有5个子进程,但是本例中只有三个,那就是说实现了对进程数量的控制。 本例中定义了几个变量: $num_proc:正在活动的进程数量,控制在3个以内,所以在父进程每次fork完子进程后,都会检查这个变量,如果超出了3个,那就等一会。当父进程fork了新子进程的时候,这个数字会增加,当子进程退出以后,父进程捕获了信号,这个数字会减少。 $num_collect:已回收的进程数量,每回收一个子进程,变量加一。 $i:已经fork的进程数量。 $num_proc和$num_collect的和应该是等于$i的,如果不等于了,那就说明,有子进程需要回收了。 进程的控制还算简单吧? 有的时候,进程和进程之间是需要通讯的,进程不像线程,整个内存空间都是共享的,但是linux也提供了多种进程之间通信的方式,最简单的方式,就是存在文件里,下一篇文档就讨论一种文件存储的方法。

共收到 2 条回复
96
scmroad · #1 · 2010年4月22日

每个进程都独立拥有自己的资源,包括内存页、文件指针等等,同时,linux系统提供了多种进程之间的交互方式,比较简单的方式就是写到文件里,DB_File就是一种解决方案。

#!/usr/bin/perl

test create of DB_File

create by lianming: 2009-08-10

last modify by lianming: 2009-08-13

use strict; use warnings; use DB_File;

my %hash; my $file_name = "test_btree"; unlink $file_name;

tie(%hash, 'DB_File', $file_name, O_CREAT|O_RDWR, 0666, $DB_BTREE) || die "Cannot open $file_name: $!\n";

== Add some info ==

$hash{"me"} = "lianming"; $hash{"else"} = "nothing"; $hash{"job"} = "monitor";

print "Print as hash:\n"; foreach my $key (keys (%hash)) { print "$key->$hash{$key}\n"; } print "End\n";

untie %hash;

执行结果为:

Print as hash: else->nothing job->monitor me->lianming End

tie命令,本来是用于变量绑定,在DB_File中,是将一个哈希变量,和一个文件绑在了一起,简单的来讲,我们可以认为变量'%hash',是存在这个文件中的,所以我们对这个哈希变量进行的所有修改,它都会在文件中做相应的操作。 tie的参数,第一个为要绑定的变量 第二个为要绑定的文件 第三个为flag(是否创建,只读,只写,可读写) 第四个为权限(和linux文件的权限类似,不过没有执行,最大是6) 第五个为存储引擎,有三种,hash、btree、recno,一般来讲,hash和btree是存储key/value类型数据的,而recno是顺序存储,相当于数组。使用上的区别,请看下文。

在DB_File中添加信息,只要直接给哈希变量添加信息即可,和一般的赋值是一样的。 DB_File也提供了类似BerkeleyDB的方法来添加对象,要将tie的结果返回给一个值,例如$db,然后利用$db->put($key, $value)的方式来添加,这种办法我不是很习惯用。

同理,在DB_File中读取信息,就直接从哈希变量中读取就ok了。

这样,我们就已经把信息存在了文件中,下次,如果另外一个进程要来修改这个文件,直接打开即可。

#!/usr/bin/perl

test read/write of DB_File

create by lianming: 2009-08-10

last modify by lianming: 2009-08-10

use strict; use warnings; use DB_File;

my %hash; my $file_name = "test_btree";

tie(%hash, 'DB_File', $file_name, O_WRONLY, 0666, $DB_BTREE) || die "Cannot open $file_name: $!\n";

== Add some info ==

for (my $i = 0; $i < 10; $i ++) { $hash{$i} = $i."new"; }

print "Print as hash:\n"; foreach my $key (keys (%hash)) { print "$key->$hash{$key}\n"; } print "End\n";

untie %hash;

执行结果如下:

Print as hash: 0->0new 1->1new 2->2new 3->3new 4->4new 5->5new 6->6new 7->7new 8->8new 9->9new else->nothing job->monitor me->lianming End

只要记住,打开文件的时候要和创建的时候采用同样的存储算法,否则会报无法打开文件的错。

一般来讲,哈希是不允许存储重复键的,就是说,对同一个key,赋两次value,那么后一次赋值会将前一次赋的值给覆盖掉,但是btree是可以存储这样的信息的。

#!/usr/bin/perl

test duplicate keys of DB_File

create by lianming: 2009-08-10

last modify by lianming: 2009-08-10

use strict; use warnings; use DB_File;

my %hash; my $file_name = "test_btree"; unlink $file_name;

== enable duplicate records

$DB_BTREE->{'flags'} = R_DUP;

tie(%hash, 'DB_File', $file_name, O_CREAT|O_WRONLY, 0666, $DB_BTREE) || die "Cannot open $file_name: $!\n";

== Add some info ==

$hash{'me'} = "Sangyb"; $hash{'me'} = "Lianming"; $hash{'me'} = "Here";

print "Print as hash:\n"; foreach my $key (keys (%hash)) { print "$key->$hash{$key}\n"; } print "End\n";

untie %hash;

执行结果如下:

Print as hash: me->Sangyb me->Sangyb me->Sangyb End

问题出现了……虽然它存储了3个相同的key,但是我明明存储的是不同的value,打出来的却是相同的value。 原因是一个叫做联想数组的东西,当你请求同样的key的时候,只会返回第一个value。所以,我们的数据其实已经存储好了,只是在读出来的时候,由于某些原因,出现了这样的问题。

这个时候,就必须利用api来进行操作了。 它提供了一个seq的函数,用来对btree中的对象进行指向:

#!/usr/bin/perl

test duplicate keys of DB_File

create by lianming: 2009-08-10

last modify by lianming: 2009-08-10

use strict; use warnings; use DB_File;

my %hash; my $file_name = "test_btree"; unlink $file_name;

== enable duplicate records

$DB_BTREE->{'flags'} = R_DUP;

my $db = tie(%hash, 'DB_File', $file_name, O_CREAT|O_WRONLY, 0666, $DB_BTREE) || die "Cannot open $file_name: $!\n";

my ($k, $v, $status);

== Add some info ==

$hash{'me'} = "Sangyb"; $hash{'me'} = "Lianming"; $hash{'me'} = "Here";

print "Print as api:\n";

== use api: seq ==

$k = $v = 0; for ($status = $db->seq($k, $v, R_FIRST); $status == 0; $status = $db->seq($k, $v, R_NEXT)) { print "$k->$v\n"; }

print "End\n";

undef $db; untie %hash;

执行结果如下:

Print as api: me->Sangyb me->Lianming me->Here End

在读取btree文件的时候,会有一个指针指向一个特定的key/value对,然后读取,写入的操作,就对这个指针指向的pair进行操作。seq函数就是控制这个指针的移动,它有三个参数: 第一个,指针指向的pair的key 第二个,指针指向的pair的value 第三个,flag,调用函数时,指针做的操作: R_CURSOR:当前 R_FIRST:最前面的一个pair R_LAST:最后的一个pair R_NEXT:下一个pair R_PREV:前一个pair 如果调用失败了,就会返回0。

针对duplicate key,还有别的几个实用的函数,例如get_dup,del_dup等。 get_dup可以获取重复键的一些信息,del_dup可以删除特定的pair。

当多个进程同时对一个文件进行操作的时候,必然要涉及到一个锁的问题,多个进程如果同时对一个文件进行写,那么肯定会挂掉,我们可以做一些测试。

1、两个进程同时进行读操作

#!/usr/bin/perl

test multi read of DB_File

create by lianming: 2009-08-10

last modify by lianming: 2009-08-10

use strict; use warnings; use DB_File;

my %hash; my $file_name = "test_btree"; unlink $file_name;

my $db = tie(%hash, 'DB_File', $file_name, O_CREAT|O_WRONLY, 0666, $DB_BTREE) || die "Cannot open $file_name: $!\n";

== Add some info ==

for (my $i = 0; $i < 20; $i ++) { $hash{$i} = $i."test"; }

== multi read ==

my $pid = fork();

if (!defined($pid)) { print "Fork error: $!\n"; exit 1; }

if ($pid == 0) { ## == child == my $key;

for (my $i = 0; $i < 1000; $i ++) { $key = int(rand()*20); print "Child: $key->$hash{$key}\n"; } undef $db; untie %hash; exit 0; } else { ## == parent == my $key;

for (my $i = 0; $i < 1000; $i ++) { $key = int(rand()*20); print "Parent: $key->$hash{$key}\n"; } }

undef $db; untie %hash;

执行结果是正常的,child和parent会交替出现,打出正确的读取结果。

... Parent: 15->15test Child: 1->1test Parent: 5->5test Child: 10->10test Parent: 3->3test Child: 5->5test Parent: 4->4test Child: 9->9test Parent: 17->17test Child: 17->17test Parent: 9->9test Child: 7->7test Parent: 15->15test Child: 2->2test Parent: 15->15test ...

2、两个进程同时写

#!/usr/bin/perl

test multi write of DB_File

create by lianming: 2009-08-10

last modify by lianming: 2009-08-10

use strict; use warnings; use DB_File;

my %hash; my $file_name = "test_btree"; unlink $file_name;

my $db = tie(%hash, 'DB_File', $file_name, O_CREAT|O_WRONLY, 0666, $DB_BTREE) || die "Cannot open $file_name: $!\n";

== multi read ==

my $pid_1 = fork();

if (!defined($pid_1)) { print "Fork error: $!\n"; exit 1; }

if ($pid_1 == 0) { ## == child == my $key;

for (my $i = 0; $i < 1000; $i ++) { $key = int(rand()*10); $hash{$key} = $key."abc"; } undef $db; untie %hash; exit 0; }

my $pid_2 = fork();

if (!defined($pid_2)) { print "Fork error: $!\n"; exit 1; }

if ($pid_2 == 0) { ## == child == my $key;

for (my $i = 0; $i < 1000; $i ++) { $key = int(rand()*10)+10; $hash{$key} = $key."abc"; } undef $db; untie %hash; exit 0; }

wait();

== parent ==

my $cnt = 0;

foreach my $key (keys(%hash)) { print "$key->$hash{$key}\n"; $cnt ++; } print "\ncount is $cnt\n";

undef $db; untie %hash;

执行结果如下:

10->10abc 11->11abc 12->12abc 13->13abc 14->14abc 15->15abc 16->16abc 17->17abc 18->18abc 19->19abc

count is 10

我们本来是想让他有20个值的,结果只有第二个子进程的修改生效了,第一个子进程的修改并没有生效。

我们可以得到一个结论,那就是DB_File可以支持多个进程同时读,但是无法同时写。 具体原因,我只能猜测一下了,那就是tie,在文件开始的tie,是将文件内容和哈希变量绑定,我们可以认为,其实就是一个指针,指到了文件的开始位置,读的话,是互不干涉的,所以可以多个进程同时读。但是写的时候,也许写的操作并不是直接写文件,而是修改哈希的值,最后在untie,或者需要的时候,才会把对哈希的操作刷到文件中(也可以用$db->sync来实现),所以,虽然两个进程都对文件进行了写的操作,但是只有最后untie的一个才会生效。 如果要测试的话,可以在第二个子进程undef之前,sleep几秒钟,就可以看到,生效的就是第一个进程的写入了。 当然,这只是最简单的情况,我测试过多个进程同时对它进行大量的写操作,那最后文件就会乱七八糟,甚至有可能没法打开。

这种情况下,是需要对文件上锁的,在锁住文件之后,才对它进行tie,在解锁之前,要进行untie。有两个给DB_File上锁的包,就拿 DB_File::Lock来做例子:

#!/usr/bin/perl

test lock of DB_File

create by lianming: 2009-08-10

last modify by lianming: 2009-08-10

use strict; use warnings; use DB_File; use DB_File::Lock; use Fcntl qw(:flock O_RDWR O_CREAT);

unlink $file_name;

my $pid_1 = fork();

if (!defined($pid_1)) { print "Fork error: $!\n"; exit 1; }

if ($pid_1 == 0) { ## == child == my $key;

for (my $i = 0; $i < 1000; $i ++) { $key = int(rand()*12)+122;

## == lock and tie == tie(%hash, 'DB_File::Lock', $file_name, O_CREAT|O_WRONLY, 0666, $DB_BTREE, "write") || die "Cannot open $file_name: $!\n"; $hash{$key} = $key."abc"; untie %hash; } exit 0; }

my $pid_2 = fork();

if (!defined($pid_2)) { print "Fork error: $!\n"; exit 1; }

if ($pid_2 == 0) { ## == child == my $key;

for (my $i = 0; $i < 1000; $i ++) { $key = int(rand()*10)+10;

## == lock and tie == tie(%hash, 'DB_File::Lock', $file_name, O_CREAT|O_WRONLY, 0666, $DB_BTREE, "write") || die "Cannot open $file_name: $!\n"; $hash{$key} = $key."abc"; untie %hash; } exit 0; }

wait();

== parent ==

tie(%hash, 'DB_File::Lock', $file_name, O_CREAT|O_RDONLY, 0666, $DB_BTREE, "read") || die "Cannot open $file_name: $!\n";

my $cnt = 0;

foreach my $key (keys(%hash)) { print "$key->$hash{$key}\n"; $cnt ++; } print "\ncount is $cnt\n";

untie %hash;

执行结果为:

10->10abc 11->11abc 12->12abc 122->122abc 123->123abc 124->124abc 125->125abc 126->126abc 127->127abc 128->128abc 129->129abc 13->13abc 130->130abc 131->131abc 132->132abc 133->133abc 14->14abc 15->15abc 16->16abc 17->17abc 18->18abc 19->19abc

count is 22

这就是我们想要的结果。在tie的时候,同时上锁,在untie的时候解锁。和DB_File的tie参数一样,最后加一个read或者write就ok 锁有两种,read和write,read锁可以多个进程同时持有,但是write锁只能一个进程持有。

如果说btree和hash的存储算法,是相当于把哈希变量存在了文件中,那recno就是把数组变量存在了文件中,recno因为用的不多,也没有具体看过。 对DB_File的性能做一些测试,具体测试代码和结果就不写了,如果大家有兴趣可以自己去搞一下,只写一下测试的结果。 1、测试随机的写性能:分别用两种引擎,单进程对一个文件随机写入100w条数据,测试结果如下: HASH:耗时53s,文件大小为83M BTREE:耗时12s,文件大小为120M 可见写的话,btree性能稍好,但是占用空间较大。 2、测试随机读的性能:分别用两种引擎,单进程对一个文件随机写入100w条数据,测试结果如下: HASH:耗时28s BTREE:耗时28s 读性能两个引擎差不多。 3、并发性能: a、用BTREE引擎,开启10个进程,每个进程对一个文件写入1w条数据,写入每条数据之前加锁,写完后解锁。测试结果耗时1000多s,可见它的并发性是何等的差劲…… b、用BTREE引擎,单进程,对一个文件写入10w条数据,写入每条数据前加锁,写完后解锁。测试结果是2分钟。

可见,单进程的读写,还是很快的,但是涉及到加锁后就很慢很慢,慢的主要是tie和untie的过程,这部分的过程涉及到文件的读写,没有任何并发的机制,所以只有等每次锁了之后才可以tie和untie,这部分的时间消耗是很可观的。并发的问题,其实有另外的办法可以解决,下一篇文档就讨论一种并发性很高的文件存储策略。

96
scmroad · #2 · 2010年4月22日

本来说是要讨论文件存储的,但是要穿越一下,因为我觉得IPC更加基础一点,特别是锁这方面,其实用Semaphore都是可以代替的,不过,ipc的话,废话可能就要多一点点了。 IPC就是用来进程间通讯用的东东,linux下,分为三类:消息队列、信号量、共享内存,在linux下,用ipcs命令就可以看到当前系统已经创建的 ipc,ipcrm,可以删除已经创建的ipc,当然,要跟id的。既然它分为三类,那就要分三种类型来做讲解了。 首先是消息队列。消息队列比较简单,就是把数据放到队列里面,按照先进先出,后进后出的序列来。消息队列用的人已经很少了,它的大部分功能,都可以由共享内存来替代,这里就不举例子了。 信号量,用的应该是比较多的,主要的作用就在于锁,代码如下:

#!/usr/bin/perl use strict qw(vars); use warnings; use IPC::SysV qw(S_IRWXU IPC_CREAT); use IPC::Semaphore; my $sem0; my $sem1; my $sem = IPC::Semaphore->new(1564, 2, S_IRWXU|IPC_CREAT) || die "IPC::Semaphore->new: $!\n"; $sem0 = $sem->getval(0); $sem1 = $sem->getval(1); print "Sem0: $sem0\n"; print "Sem1: $sem1\n"; $sem->setval(0,10); $sem->setval(1,100); $sem0 = $sem->getval(0); $sem1 = $sem->getval(1); print "Sem0: $sem0\n"; print "Sem1: $sem1\n"; $sem->op(0,-1,IPC_NOWAIT) || die "sem setval: $!\n"; $sem->op(1,10,IPC_NOWAIT) || die "sem setval: $!\n"; $sem0 = $sem->getval(0); $sem1 = $sem->getval(1); print "Sem0: $sem0\n"; print "Sem1: $sem1\n"; 执行结果如下: Sem0: 0 Sem1: 0 Sem0: 10 Sem1: 100 Sem0: 9 Sem1: 110 在刚开始,定义一个信号量: 第一个参数是key,它的key是1564,这个是随便选的,如果再别的进程里也需要连接这个信号量,那么就用相同的key连接即可。 第二个参数是信号量的数量,上例中是两个,信号量可以看做是在内存里存放的一个纯数字的数组,这个数量,那就是你的数组大小,可以包含多少数字。 第三个参数是flag,定义权限等,S_IRWXU,意思是创建它的用户可读可写可执行,如果是S_IRWXG,则是属组,如果是S_IRWXO,则是所有人。IPC_CREAT代表如果不存在,则创建。 getval函数,是用来获取信号量中的数值的,参数只有一个,可以看做是这个信号量数组的下标,上例只有两个数,那么就是0或者1。也可以用 getall直接导出为数组。 setval函数就是赋值了,第一个参数为下标,第二个为要赋的值。 op函数是操作的意思,第一个函数为下标,第二个函数为要做的加加减减的操作,正为加,负为减,第三个为flag remove函数可以销毁信号量。 脚本最后并没有销毁信号量,在脚本退出后,可以执行ipcs命令,看到我们创建的这个信号量。

ipcs -s

------ Semaphore Arrays -------- key semid owner perms nsems
0x0000061c 131072 root 700 2 可以看出它的权限是700,里面有两个数值。 如果我们再执行一次脚本,因为它用了同样的key,所以并不会重新创建,而是连接到已有的信号量上,输出如下: Sem0: 9 Sem1: 110 Sem0: 10 Sem1: 100 Sem0: 9 Sem1: 110 和第一次执行,刚开始输出的0不同,这次的初始值为上次最后的值。如果我们在脚本最后加上“$sem->remove();”,那么信号量将被销毁,脚本执行后,执行ipcs,输出如下:

ipcs -s

------ Semaphore Arrays -------- key semid owner perms nsems 信号量已经被销毁了。

信号量所能存储的信息是很有限的,它最广泛的应用就是在锁上面,比如自旋锁,当一个进程进入一段资源之前,先检查特定的信号量,如果它是0,则将它置为 1,进入资源,在退出资源以后,再将它置为0,如果再进入资源前,检查信号量发现是1,那么就等待一段时间,等它成为0以后再进行操作。读写锁也一样,读锁加1,写锁减1,如果发现它不小于1,那么就将信号量加1,然后读取资源,读取完毕后减去1,写的话,只有当发现信号量等于1的时候,才能进行写操作,之前要将它减去1,成为0,阻止其它任何操作。几乎所有的锁操作,都可以用它来完成。 下面的代码是一段用信号量给DB_File多进程写操作进行加锁的过程,和用DB_File::Lock是同样的效果。 #!/usr/bin/perl

sem for lock

create by lianming: 2009-08-12

last modify by lianming: 2009-08-12

use strict qw(vars); use warnings; use DB_File; use IPC::SysV qw(IPC_PRIVATE S_IRWXU IPC_CREAT); use IPC::Semaphore;

use POSIX ":sys_wait_h"; our $zombies = 0; our $procs = 0; $SIG{CHLD} = sub { $zombies++ }; sub REAPER { my $pid; while (($pid = waitpid(-1, WNOHANG)) > 0) { $zombies --; } } my $sem = IPC::Semaphore->new(7456, 1, S_IRWXU|IPC_CREAT) || die "IPC::Semaphore->new: $!\n"; $sem->setval(0,0) || die "sem setval: $!\n"; sub child { my $sem_c = IPC::Semaphore->new(7456, 1, S_IRWXU) || die "IPC::Semaphore->new: $!\n"; my $cnt = $_[0]; my $sem_v; my %hash; my $file_name = "/opt/B_db_test/DB/test_btree"; my ($k, $v); for (my $i = 1; $i < 10; $i++) { $k = $cnt*10+$i; $v = $k*2; do { $sem_v = $sem_c->getval(0); } while ($sem_v != 0); $sem_c->op(0,1,IPC_NOWAIT); my $db = tie(%hash, 'DB_File', $file_name, O_CREAT|O_RDWR, 0666, $DB_BTREE) || die "Cannot open $file_name: $!\n"; $hash{$k} = $v; undef $db; untie %hash; $sem_c->op(0,-1,IPC_NOWAIT); } my $time_end = time(); print "$time_end: complete\n"; exit 0; } for ($procs; $procs<5; $procs++) { my $pid = fork(); if (!defined($pid)) { print "Fork Error: $!\n"; exit 1; } if ($pid == 0) { &child($procs); exit 0; } else { my $time = time(); print "$time:$procs process forked!\n"; &REAPER if ($zombies > 0); sleep(0.02); } } exit 0; 共享内存,存放数据的话,共享内存应该算是主力了,因为它相对而言更加灵活,不过在多进程操作的时候,需要额外加锁,很多情况下,这个锁都是通过信号量加的。 #!/usr/bin/perl

test share memory

create by lianming: 2009-08-14

last modify by lianming: 2009-08-14

use strict qw(vars); use warnings; use IPC::ShareLite; my $share = IPC::ShareLite->new( -key => 1971, -create => 'yes', -destroy => 'yes' ) or die $!; my $b = "Test for share memory"; $share->store($b); my $test = $share->fetch; if ($test ne "") { print "$test\n"; } 输出结果为: Test for share memory IPC::ShareLite是对共享内存的pm包进行了一次封装,使用上更加人性化吧! 创建共享内存,创建的过程是一个对哈希进行赋值的过程: 1、key,使用相同的key,可以再不同的进程中访问同一段共享内存 2、create,是否创建 3、destroy,是否销毁 4、exclusive,如果它为true的话,那么如果这段共享内存存在了,它会报错 5、mode,权限 6、size,共享内存的大小,默认为65536Byte,linux支持的最大共享内存可以通过调整内核参数来设置,位置在/proc/sys /kernel下,shmall、shmmax、shmmin(消息队列的内核参数也在此目录下调整,为msgmax、msgmnb、msgmni) store函数,就是将一个变量存入共享内存。 fetch是从共享内存中获取变量。 如果destroy设置为0的话,那么在脚本退出后,可以通过ipcs命令看到我们创建的共享内存。

ipcs -s -m

------ Shared Memory Segments -------- key shmid owner perms bytes nattch status
0x000007b3 98304 root 666 65536 0
------ Semaphore Arrays -------- key semid owner perms nsems
0x000007b3 294913 root 666 3
但是我们会发现,不仅仅是一段共享内存,它还创建了一个信号量,这个信号量和共享内存的 key是相同的,它就是用来对共享内存加锁的。ShareLite的锁做的有问题,我在调用lock和unlock的时候会报错。 Use of uninitialized value in subroutine entry at /usr/lib/perl5/site_perl/5.8.5/i386-linux-thread-multi/IPC/ShareLite.pm line 361. 不过,既然是通过信号量加锁,那倒是也无所谓,自己多写两行代码罢了。 还有一个比较麻烦的地方,就是它无法直接存储数组和哈希,存储数组和哈希要通过另外一个包实现: #!/usr/bin/perl use strict qw(vars); use warnings; use IPC::ShareLite; use Storable qw(freeze thaw); my $share = IPC::ShareLite->new( -key => 1971, -create => 'yes', -destroy => 'no' ) or die $!; my @a = ("This", "is", "stored", "in", "shared", "memory"); my $b = \@a; $share->store(freeze($b)); my $test = $share->fetch; if ($test ne "") { my $str = thaw($share->fetch) || print "Wrong\n\n"; print "@{$str}\n"; } 输出结果为: This is stored in shared memory 哈希值也是一样的。 需要先包含Storable,在调用store之前,先调用freeze函数,它的参数为你想存储的哈希或者数组的引用。在调用了fetch之后,要将获取到的值作为参数调用thaw函数,还原为引用的格式,然后才能使用。

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册