2011年8月30日 星期二

[C++] Effective C++ Item 42: Understand the two meanings of typename

今天剛好遇到一個小問題, 之前剛好在翻Effective C++, 翻一下書剛好就找到了, 順便在加深一下印象~

typename除了直接使用在template裡面當然宣告template parameter的地方, 如以下情形之外

template
class Foo
{
public:
    void foo(T a);
};

當在程式內部的name跟template parameter有相依關係的時候, 這個就叫做dependent name, 在C++當中, 他預設不會幫你假設這樣的name是type, 除非你有明確的告訴parser.

所以像以下的情形, 你就必須在前面加上typename, 這樣compiler才會認為宣告的T::iterator 也是一個type, 才不會有compiler error.


template
class Foo
{
public:
    void foo(T a);
    void goo(typename T::iterator i);
private:
    typename std::vector::iterator a;
};

2011年8月28日 星期日

[Erlang] RabbitMQ rabbit_alarm module

在Erlang裡面, 我們可以很簡單的實做一個alarm handler, 他可以接收各種alarm event, 並根據event  message來做出回應, 同時Erlang不限制你只能有一個alarm handler, 你可以根據不同功能來實做相對應的alarm handler, 只要註冊不同的名字, 在你觸發事件的時候, 可以選擇要給哪些alarm handler處理.

當要觸發事件的時候, 只需要呼叫gen_event:notify, 並且在handle_event處理相對應的event即可.

在rabbitmq內部他有實做一套memory monitor的機制, 這個機制是可以運作在cluster node內部或是單一node的情況. 而這個memory monitr有兩個部份, 關於內部所有用到的queue, channel所使用的memory monitor下次在介紹, 這次的memory monitor 只監控整體的使用量, 當到達高水位的時候, 便會block新進來的connection, 並且通知cluster其他node也要開始節省使用.

所在檔案rabbit_alarm.erl & vm_memory_monitor.erl

rabbitmq 在啟動的時候, boot的階段會去spawn一個alarm process.

rabbit.erl

 88 -rabbit_boot_step({rabbit_alarm,           
 89          [{description, "alarm handler"},
 90           {mfa,         {rabbit_alarm, start, []}},
 91           {requires,    kernel_ready},
 92           {enables,     core_initialized}]}).

在rabbit_alarm 這個process啟動的時候做了以下的事情, 他會從ebin/rabbit.app 裡面你所設定的memory條件來設定並啟動另外一個monitor process 來monitor memory, 當memory到達那個條件後的時候給你alarm.


rabbit_alarm.erl

 45 start() ->
        % 新增一個新的alarm handler, 名字是rabbit_alarm
 46     ok = alarm_handler:add_alarm_handler(?MODULE, []),
 47     {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
 48     ok = case MemoryWatermark == 0 of
            % 如果沒設定就不監控
 49          true  -> ok;
            % 啟動另外一個monitor process
 50          false -> rabbit_sup:start_restartable_child(vm_memory_monitor,
 51                [MemoryWatermark])
 52     end,
 53     ok.

    % rabbit_alarm本身有maintain兩個table
    % alertees 代表發生alarm他要通知的其他node
    % alarmed_nodes 代表已經發生過alarm的node
 78 init([]) ->
 79     {ok, #alarms{alertees      = dict:new(),
 80                  alarmed_nodes = sets:new()}}.

接下來先來看他監控的方式.

vm_memory_monitor.erl

105 start_link(Args) ->
106     gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
107  
108 init([MemFraction]) ->
109     TotalMemory =
        % get_total_memory 可以查看該檔案內部
        % 針對不同OS的處理
110         case get_total_memory() of
111             unknown ->
112                 error_logger:warning_msg(
113                   "Unknown total memory size for your OS ~p. "
114                   "Assuming memory size is ~pMB.~n",
115                   [os:type(), trunc(?MEMORY_SIZE_FOR_UNKNOWN_OS/1048576)]),
116                 ?MEMORY_SIZE_FOR_UNKNOWN_OS;
117             M -> M
118         end,
119     MemLimit = get_mem_limit(MemFraction, TotalMemory),
120     error_logger:info_msg("Memory limit set to ~pMB.~n",
121                           [trunc(MemLimit/1048576)]),
        % 啟動timer, 固定時間檢查memory usage
122     TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
123     State = #state { total_memory = TotalMemory,
124                      memory_limit = MemLimit,
125                      timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
126                      timer = TRef,
127                      alarmed = false},
128     {ok, internal_update(State)}.

171 internal_update(State = #state { memory_limit = MemLimit,
172                                  alarmed = Alarmed}) ->
        % erlang VM內部目前使用的
173     MemUsed = erlang:memory(total),
174     NewAlarmed = MemUsed > MemLimit,
175     case {Alarmed, NewAlarmed} of
176         {false, true} ->
            % 如果之前還沒發出過alarm
            % 就會發出alarm, 給alarm_handler
177             emit_update_info(set, MemUsed, MemLimit),
178             alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []});
179         {true, false} ->
            % 反之則清理掉alarm
180             emit_update_info(clear, MemUsed, MemLimit),
181             alarm_handler:clear_alarm({vm_memory_high_watermark, node()});
182         _ ->
183             ok
184     end,
185     State #state {alarmed = NewAlarmed}.
186  
    % 印出log
187 emit_update_info(State, MemUsed, MemLimit) ->
188     error_logger:info_msg(
189       "vm_memory_high_watermark ~p. Memory used:~p allowed:~p~n",
190       [State, MemUsed, MemLimit]).
191  
    % 定時update memory usage state
192 start_timer(Timeout) ->
193     {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
194     TRef.

現在回到rabbitm_alarm.erl 看他怎麼處理當收到high memory watermark的alarm.

rabbit_alarm.erl

 89 handle_event({set_alarm, {{vm_memory_high_watermark, Node}, []}}, State) ->
 90     {ok, maybe_alert(fun sets:add_element/2, Node, State)};
 91  
 92 handle_event({clear_alarm, {vm_memory_high_watermark, Node}}, State) ->
 93     {ok, maybe_alert(fun sets:del_element/2, Node, State)};


126 maybe_alert(SetFun, Node, State = #alarms{alarmed_nodes = AN,
127                      alertees      = Alertees}) ->
128     AN1 = SetFun(Node, AN),
129     BeforeSz = sets:size(AN),
130     AfterSz  = sets:size(AN1),
131     %% If we have changed our alarm state, inform the remotes.
132     IsLocal = Node =:= node(),
        % 如果是local alarm
        % 就會通知其他的remote node
133     if IsLocal andalso BeforeSz < AfterSz -> ok = alert_remote(true,  Alertees);
134        IsLocal andalso BeforeSz > AfterSz -> ok = alert_remote(false, Alertees);
135        true                               -> ok
136     end,
137     %% If the overall alarm state has changed, inform the locals.
138     case {BeforeSz, AfterSz} of
139         {0, 1} -> ok = alert_local(true,  Alertees);
140         {1, 0} -> ok = alert_local(false, Alertees);
141         {_, _} -> ok
142     end,
        % 更新alarmed_node
143     State#alarms{alarmed_nodes = AN1}.

145 alert_local(Alert, Alertees)  -> alert(Alert, Alertees, fun erlang:'=:='/2).
146      
147 alert_remote(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=/='/2).
148      
    % 找出所有需要通知的node, 並且執行Node之前register的callback
149 alert(Alert, Alertees, NodeComparator) ->
150     Node = node(),                                                                                                                                                     
151     dict:fold(fun (Pid, {M, F, A}, ok) ->
152                       case NodeComparator(Node, node(Pid)) of
153                           true  -> apply(M, F, A ++ [Pid, Alert]);
154                           false -> ok
155                       end
156               end, ok, Alertees).

所以之後其他的module, 只需要register callback, 當memory用量過高, 要怎麼處理就行, 這部份的code 是放在rabbit_reader.erl, 也就是當connection coming的時候.

rabbit_reader.erl

691     State1 = internal_conserve_memory(
           % register callback
           % 當記憶體使用過量
           % 便會呼叫
692        rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
693        State#v1{connection_state = running,
694                connection = NewConnection}),

    % 當使用過量, state改為blocking
    % 不在接收新的connection
348 internal_conserve_memory(true,  State = #v1{connection_state = running}) ->
349     State#v1{connection_state = blocking};
350 internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->                                                                                           
351     State#v1{connection_state = running};
352 internal_conserve_memory(false, State = #v1{connection_state = blocked,
353                                             heartbeater      = Heartbeater}) ->
354     ok = rabbit_heartbeat:resume_monitor(Heartbeater),
355     State#v1{connection_state = running};
356 internal_conserve_memory(_Conserve, State) ->
357     State

另外當新的cluster node 啟動的時候, 假設現在Node A收到Node B up的event, Node A會跟Node B註冊一個remote_conserve_memory callback, 當Node B 記憶體過大, 他就會呼叫Node A 開始節省memory, 直到原本的alarm被clear.

rabbit_alarm.erl

 95 handle_event({node_up, Node}, State) ->         
 96     %% Must do this via notify and not call to avoid possible deadlock.
 97     ok = gen_event:notify(                      
 98            {alarm_handler, Node},               
 99            {register, self(), {?MODULE, remote_conserve_memory, []}}),
100     {ok, State};

 69 remote_conserve_memory(Pid, true) ->
        % 叫自己開始節省記憶體
 70     gen_event:notify({alarm_handler, node(Pid)},
 71                      {set_alarm, {{vm_memory_high_watermark, node()}, []}});

2011年8月26日 星期五

[Erlang] RabbitMQ cluster node monitor module

在RabbitMQ啟動的時候, 會預先執行一些boot steps, 其中有一步就是會啟動rabbitnodemonitor.


rabbit.erl (program entry point)

% 先啟動node_monitor這個process
114 -rabbit_boot_step({rabbit_node_monitor,
115 [{description, "node monitor"},
116 {mfa, {rabbit_sup, start_restartable_child,
117 [rabbit_node_monitor]}},
118 {requires, kernel_ready},
119 {enables, core_initialized}]}).

所以實際在monitor cluster node的module是在rabbitnodemonitor.erl檔案裡面.

在erlang的世界當中, 其實要組成cluster是相當容易的, 每一個node只要在啟動的時候, 設定node的名字(cluster內部不重複), 並且搭配同樣的cookie, 彼此之間就可以互相溝通.

所以在nodemonitor 這個process起來的時候他會呼叫init function, 而netkernel:monitor_nodes, 代表他會接收所有他監控node的status message.


rabbitnodemonitor.erl

init([]) ->
ok = net_kernel:monitor_nodes(true),
{ok, no_state}.

到這時候, node_monitor就做好了初始化的動作, 接下來就要開始cluster內部node資料傳遞, 所以boot steps後面的動作就是notify cluster


rabbit.erl (program entry point)

155 -rabbit_boot_step({notify_cluster,
156 [{description, "notify cluster nodes"},
157 {mfa, {rabbit_node_monitor, notify_cluster, []}},
158 {requires, networking}]}).

在51行, 可以看到把自己的資訊multicast出去


rabbitnodemonitor.erl

% 接下來在收集整個cluster的information, 並且把自己的資料傳出去, 讓cluster的其他node知道
47 notify_cluster() ->
48 Node = node(), % 這是代表自己的node
% 這邊因為每個rabbitmq 起來, 都會有自己的mnesia, 所以可以藉由這個來收集已經起來的rabbitmq node
49 Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
50 %% notify other rabbits of this rabbit
51 case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on,
52 [Node], ?RABBIT_UP_RPC_TIMEOUT) of
53 {_, [] } -> ok;
54 {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
55 end,
56 %% register other active rabbits with this rabbit
57 [ rabbit_node_monitor:rabbit_running_on(N) || N <- Nodes ],
58 ok.

而接下來的程式就是當monitor_node這個process根據收到各種status message所做的處理


rabbitnodemonitor.erl

66 handle_call(_Request, _From, State) ->
67 {noreply, State}.
68
69 handle_cast({rabbit_running_on, Node}, State) ->
70 rabbit_log:info("node ~p up~n", [Node]),
% 收到新的node, 所以納入監控
71 erlang:monitor(process, {rabbit, Node}),
% 呼叫自己的event handler 處理新啟動的node
72 ok = rabbit_alarm:on_node_up(Node),
73 {noreply, State};
74 handle_cast(_Msg, State) ->
75 {noreply, State}.
76
77 handle_info({nodedown, Node}, State) ->
% 收到有監控的node 正常掛掉了
78 rabbit_log:info("node ~p down~n", [Node]),
79 ok = handle_dead_rabbit(Node),
80 {noreply, State};
81 handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
% 收到突然掛掉的node
82 rabbit_log:info("node ~p lost 'rabbit'~n", [Node]),
83 ok = handle_dead_rabbit(Node),
84 {noreply, State};

96 %% TODO: This may turn out to be a performance hog when there are lots
97 %% of nodes. We really only need to execute some of these statements
98 %% on *one* node, rather than all of them.
99 handle_dead_rabbit(Node) ->
100 ok = rabbit_networking:on_node_down(Node),
101 ok = rabbit_amqqueue:on_node_down(Node),
102 ok = rabbit_alarm:on_node_down(Node).

接下來針對有新node啟動的情形來看一下他怎麼處理, 他會呼叫 rabbitalarm 這個module來處理, 在這裡的flow有點不太一樣, 要分成兩個部份來看, 現在的情形是Node A跟Node B, Node A收到Node B nodeup 的message, 所以Node A呼叫自己的 rabbit_alarm module 來處理, 進入這邊之後, Node A沒作甚饃特別的事情(反正Node A已經有monitor Node B了), 但是他會送一個{register, self(), ...} event給Node B去處理.


rabbit_alarm.erl on Node A

95 handle_event({node_up, Node}, State) ->
96 %% Must do this via notify and not call to avoid possible deadlock.
97 ok = gen_event:notify(
98 {alarm_handler, Node},
99 {register, self(), {?MODULE, remote_conserve_memory, []}}),
100 {ok, State};

接著我們來看Node B這邊做了甚饃事情~ 他接著也會monitor Node A, 並且看Node A是不是有在他的alarmednodes裡面, 如果有的話, 就會執行傳進來的Module, Function, Arguments, 就是rabbitalarm.erl 69-71行做的事情, 接著再把Node A加進alertees裡面(之後有alert發生要通知的對象之一), 這樣子就完成加入新Node的動作.


rabbit_alarm.erl on Node B

67 %% Can't use alarm_handler:{set,clear}_alarm because that doesn't
68 %% permit notifying a remote node.
% 在送set_alarm event 回給 Node A
69 remote_conserve_memory(Pid, true) ->
70 gen_event:notify({alarm_handler, node(Pid)},
71 {set_alarm, {{vm_memory_high_watermark, node()}, []}});
72 remote_conserve_memory(Pid, false) ->
73 gen_event:notify({alarm_handler, node(Pid)},
74 {clear_alarm, {vm_memory_high_watermark, node()}}).

% 收到Node A的 event
82 handle_call({register, Pid, HighMemMFA}, State) ->
83 {ok, 0 < sets:size(State#alarms.alarmed_nodes),
84 internal_register(Pid, HighMemMFA, State)};

% 首先M, F, A 分別是 ?MODULE, remote_conserve_memory, []
158 internal_register(Pid, {M, F, A} = HighMemMFA,
159 State = #alarms{alertees = Alertees}) ->
160 _MRef = erlang:monitor(process, Pid),
161 case sets:is_element(node(), State#alarms.alarmed_nodes) of
162 true -> ok = apply(M, F, A ++ [Pid, true]);
163 false -> ok
164 end,
165 NewAlertees = dict:store(Pid, HighMemMFA, Alertees),
166 State#alarms{alertees = NewAlertees}.

所以其實monitor cluster node的部份在erlang的實做是相當的簡單~

至於關於rabbit_alarm.erl的部份, 裡面有關於monitor memory部份, 留到下次再說~

[Linux] system info

Use sudo dmidecode -t memory to see how much memory capacity in your machine. There are some more information about hardware information can be found by using this command.

2011年8月23日 星期二

[Erlang] Prime generator

好久沒寫erlang, 有點生疏了~ 剛好看到有人在講prime generator, 就寫了一個~:P


-module(prime).

-export([generate_prime/1]).

filter_number(Pid, P) ->
receive
N when N rem P /= 0 ->
% In this case, the N can not be divided by P,
% so pass it to next prime process
Pid ! N;
_N -> ok
end,
filter_number(Pid, P).

prime() ->
Prime = receive P -> P end,
io:format("Prime ~p~n", [Prime]),
Pid = spawn(fun prime/0),
filter_number(Pid, P).

generate_prime(N) ->
% create the first prime process
Pid = spawn(fun prime/0),
lists:map(fun(X) -> Pid ! X end, lists:seq(2, N)),
ok.

2011年8月18日 星期四

[GDB] Reverse step for GDB

從GDB 7.0之後就support Process record 也就是所謂的"reverse debugging", 一般而言我們在gdb上面可以執行step, continue, next這些指令, 但是一旦走到下一步之後, 就無法回到下一步~ "reverse debugging" 就是可以還原到上一步, 或是還原上一次的breakpoint. 目前這個功能有平台上的限制, 只能支援以下平台

  • i386-linux
  • amd64-linux
  • moxie-elf / moxie-linux

他底層其實就是gdb 會把你每一步執行的指令logging起來,搭配memory跟register的狀態, 所以才能成功還原上一步. 

假設我有下面的程式

#include
#include

int main()
{
int i;

for(i=0;i<100;i++) {
int t =
printf("i = %d, t = %d\n", i, t);
}
return 0;
}

現在開始進行gdb debug

$ gcc -g tests.c # compile code
$ gdb a.out # start to debug
(gdb) r
The program being debugged has been started already.
Start it from the beginning? (y or n) y
Starting program: /home/ytshen/a.out

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 0
1: i = 0
(gdb) record # 代表我們開始紀錄, 之後才能使用 reverse-step或是相關指令
(gdb) c
Continuing.
ci = 0, t = 1804289383

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 1804289383
1: i = 1
(gdb) c
Continuing.
i = 1, t = 846930886

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 846930886
1: i = 2
(gdb) c
Continuing.
i = 2, t = 1681692777

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 1681692777
1: i = 3
(gdb) c
Continuing.
i = 3, t = 1714636915

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 1714636915
1: i = 4
# 接下來開始進行reverse
(gdb) reverse-continue
Continuing.

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 1681692777
1: i = 3
# 可以看到所有的state都被保留下來
(gdb) reverse-continue
Continuing.

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 846930886
1: i = 2
(gdb) reverse-continue
Continuing.

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 1804289383
1: i = 1
(gdb)

上面就是一個簡單的例子, 利用gdb來進行reverse debugging, 他還有一些簡單的指令
ex:

  • "record stop": 停止紀錄指令
  • "record delete": 刪除之前的紀錄
  • "info record": 
  • "set record stop-at-limit": 設定logging buffer滿之後的行為, 如果是on, 在buffer滿了就會停下來, off則會蓋掉舊得logging指令
  • "set record insn-number-max": 設定紀錄指令的大小, 預設是3292
  • "reverse-step": 上一步, 如果是function call則會進入function開頭
  • "reverse-continue": 還原到上一個break point
  • "reverse-next": 上一步, 但是如果是function call會直接執行完到return
  • "set exec-direction [forward | reverse]: 設定好方向之後, 可以直接使用continue, next, step

Reference

 

2011年8月2日 星期二

[C] Low level IO primitives

Low level IO like open/write/close should call fdatasync() when you want to make sure data is write to disk, because this still keep some data buffered in file descriptor.

And about stdio library like fopen/fwrite/fclose should call fflush() before fdatasync(), because stdio library keep some data buffered in the FILE structure.