Branch data Line data Source code
1 : : // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 : : // vim: ts=8 sw=2 smarttab
3 : : /*
4 : : * Ceph - scalable distributed file system
5 : : *
6 : : * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 : : *
8 : : * This is free software; you can redistribute it and/or
9 : : * modify it under the terms of the GNU Lesser General Public
10 : : * License version 2.1, as published by the Free Software
11 : : * Foundation. See file COPYING.
12 : : *
13 : : */
14 : :
15 : :
16 : : #include <sstream>
17 : : #include <stdlib.h>
18 : : #include <signal.h>
19 : : #include <limits.h>
20 : :
21 : : #include "Monitor.h"
22 : : #include "common/version.h"
23 : :
24 : : #include "osd/OSDMap.h"
25 : :
26 : : #include "MonitorStore.h"
27 : :
28 : : #include "msg/Messenger.h"
29 : :
30 : : #include "messages/PaxosServiceMessage.h"
31 : : #include "messages/MMonMap.h"
32 : : #include "messages/MMonGetMap.h"
33 : : #include "messages/MMonGetVersion.h"
34 : : #include "messages/MMonGetVersionReply.h"
35 : : #include "messages/MGenericMessage.h"
36 : : #include "messages/MMonCommand.h"
37 : : #include "messages/MMonCommandAck.h"
38 : : #include "messages/MMonProbe.h"
39 : : #include "messages/MMonJoin.h"
40 : : #include "messages/MMonPaxos.h"
41 : : #include "messages/MRoute.h"
42 : : #include "messages/MForward.h"
43 : :
44 : : #include "messages/MMonSubscribe.h"
45 : : #include "messages/MMonSubscribeAck.h"
46 : :
47 : : #include "messages/MAuthReply.h"
48 : :
49 : : #include "messages/MTimeCheck.h"
50 : :
51 : : #include "common/strtol.h"
52 : : #include "common/ceph_argparse.h"
53 : : #include "common/Timer.h"
54 : : #include "common/Clock.h"
55 : : #include "common/errno.h"
56 : : #include "common/perf_counters.h"
57 : : #include "common/admin_socket.h"
58 : :
59 : : #include "include/color.h"
60 : : #include "include/ceph_fs.h"
61 : : #include "include/str_list.h"
62 : :
63 : : #include "OSDMonitor.h"
64 : : #include "MDSMonitor.h"
65 : : #include "MonmapMonitor.h"
66 : : #include "PGMonitor.h"
67 : : #include "LogMonitor.h"
68 : : #include "AuthMonitor.h"
69 : :
70 : : #include "auth/AuthMethodList.h"
71 : : #include "auth/KeyRing.h"
72 : :
73 : : #include "common/config.h"
74 : : #include "include/assert.h"
75 : :
76 : : #define dout_subsys ceph_subsys_mon
77 : : #undef dout_prefix
78 : : #define dout_prefix _prefix(_dout, this)
79 : 66 : static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
80 : 66 : return *_dout << "mon." << mon->name << "@" << mon->rank
81 : 132 : << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
82 : : }
83 : :
84 : 1 : long parse_pos_long(const char *s, ostream *pss)
85 : : {
86 [ - + ]: 1 : if (*s == '-' || *s == '+') {
87 : 0 : *pss << "expected numerical value, got: " << s;
88 : 0 : return -EINVAL;
89 : : }
90 : :
91 : : string err;
92 [ + - ]: 1 : long r = strict_strtol(s, 10, &err);
93 [ - + ][ # # ]: 1 : if ((r == 0) && !err.empty()) {
[ - + ]
94 [ # # ]: 0 : if (pss)
95 [ # # ]: 0 : *pss << err;
96 : : return -1;
97 : : }
98 [ - + ]: 1 : if (r < 0) {
99 [ # # ]: 0 : if (pss)
100 [ # # ][ # # ]: 0 : *pss << "unable to parse positive integer '" << s << "'";
[ # # ]
101 : : return -1;
102 : : }
103 : 1 : return r;
104 : : }
105 : :
106 : :
107 : 6 : Monitor::Monitor(CephContext* cct_, string nm, MonitorStore *s, Messenger *m, MonMap *map) :
108 : : Dispatcher(cct_),
109 : : name(nm),
110 : : rank(-1),
111 : : messenger(m),
112 : : lock("Monitor::lock"),
113 : : timer(cct_, lock),
114 : : has_ever_joined(false),
115 : : logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
116 : : monmap(map),
117 : : clog(cct_, messenger, monmap, LogClient::FLAG_MON),
118 : : key_server(cct, &keyring),
119 : : auth_cluster_required(cct,
120 : 6 : cct->_conf->auth_supported.length() ?
121 : : cct->_conf->auth_supported : cct->_conf->auth_cluster_required),
122 : : auth_service_required(cct,
123 : 6 : cct->_conf->auth_supported.length() ?
124 : : cct->_conf->auth_supported : cct->_conf->auth_service_required),
125 : : store(s),
126 : :
127 : : state(STATE_PROBING),
128 : :
129 : : elector(this),
130 : : leader(0),
131 : : quorum_features(0),
132 : :
133 : : timecheck_epoch(0),
134 : : timecheck_round(0),
135 : : timecheck_event(NULL),
136 : :
137 : : probe_timeout_event(NULL),
138 : :
139 : : paxos_service(PAXOS_NUM),
140 : : admin_hook(NULL),
141 [ + - ][ + - ]: 6 : routed_request_tid(0)
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ]
142 : : {
143 : 6 : rank = -1;
144 : :
145 [ + - ][ + - ]: 6 : paxos_service[PAXOS_PGMAP] = new PGMonitor(this, add_paxos(PAXOS_PGMAP));
[ + - ]
146 [ + - ][ + - ]: 6 : paxos_service[PAXOS_OSDMAP] = new OSDMonitor(this, add_paxos(PAXOS_OSDMAP));
[ + - ]
147 : : // mdsmap should be added to the paxos vector after the osdmap
148 [ + - ][ + - ]: 6 : paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, add_paxos(PAXOS_MDSMAP));
149 [ + - ][ + - ]: 6 : paxos_service[PAXOS_LOG] = new LogMonitor(this, add_paxos(PAXOS_LOG));
150 [ + - ][ + - ]: 6 : paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, add_paxos(PAXOS_MONMAP));
151 [ + - ][ + - ]: 6 : paxos_service[PAXOS_AUTH] = new AuthMonitor(this, add_paxos(PAXOS_AUTH));
152 : :
153 [ + - ]: 6 : mon_caps = new MonCaps();
154 : 6 : mon_caps->set_allow_all(true);
155 : 6 : mon_caps->text = "allow *";
156 : :
157 [ + - ]: 6 : exited_quorum = ceph_clock_now(g_ceph_context);
158 : 6 : }
159 : :
160 : 36 : Paxos *Monitor::add_paxos(int type)
161 : : {
162 : 36 : Paxos *p = new Paxos(this, type);
163 : 36 : paxos.push_back(p);
164 : 36 : return p;
165 : : }
166 : :
167 : 911 : Paxos *Monitor::get_paxos_by_name(const string& name)
168 : : {
169 [ + - ]: 3962 : for (list<Paxos*>::iterator p = paxos.begin();
170 : 6102 : p != paxos.end();
171 : : ++p) {
172 [ + + ]: 3051 : if ((*p)->machine_name == name)
173 : 911 : return *p;
174 : : }
175 : : return NULL;
176 : : }
177 : :
178 : 0 : PaxosService *Monitor::get_paxos_service_by_name(const string& name)
179 : : {
180 [ # # ]: 0 : if (name == "mdsmap")
181 : 0 : return paxos_service[PAXOS_MDSMAP];
182 [ # # ]: 0 : if (name == "monmap")
183 : 0 : return paxos_service[PAXOS_MONMAP];
184 [ # # ]: 0 : if (name == "osdmap")
185 : 0 : return paxos_service[PAXOS_OSDMAP];
186 [ # # ]: 0 : if (name == "pgmap")
187 : 0 : return paxos_service[PAXOS_PGMAP];
188 [ # # ]: 0 : if (name == "logm")
189 : 0 : return paxos_service[PAXOS_LOG];
190 [ # # ]: 0 : if (name == "auth")
191 : 0 : return paxos_service[PAXOS_AUTH];
192 : :
193 : 0 : assert(0 == "given name does not match known paxos service");
194 : : return NULL;
195 : : }
196 : :
197 [ + - ][ + - ]: 30 : Monitor::~Monitor()
[ + - + - ]
[ + - ][ + - ]
198 : : {
199 [ + + ]: 42 : for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++)
200 [ + - ][ + - ]: 36 : delete *p;
201 [ + + ]: 42 : for (list<Paxos*>::iterator p = paxos.begin(); p != paxos.end(); p++)
202 [ + - ]: 72 : delete *p;
203 [ - + ]: 6 : assert(session_map.sessions.empty());
204 [ + - ]: 6 : delete mon_caps;
205 : 9 : }
206 : :
207 : 30 : void Monitor::recovered_leader(int id)
208 : : {
209 [ - + ][ # # ]: 30 : dout(10) << "recovered_leader " << id << " " << get_paxos_name(id) << " (" << paxos_recovered << ")" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
210 [ - + ]: 30 : assert(paxos_recovered.count(id) == 0);
211 : 30 : paxos_recovered.insert(id);
212 [ + + ]: 30 : if (paxos_recovered.size() == paxos.size()) {
213 [ - + ][ # # ]: 5 : dout(10) << "all paxos instances recovered, going writeable" << dendl;
[ # # ][ # # ]
214 : :
215 [ # # ][ - + ]: 10 : if (!features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_GV) &&
[ - + ]
216 : : (quorum_features & CEPH_FEATURE_MON_GV)) {
217 : 0 : require_gv_ondisk();
218 : 0 : require_gv_onwire();
219 : : }
220 : :
221 [ + + ]: 35 : for (list<Paxos*>::iterator p = paxos.begin(); p != paxos.end(); p++) {
222 [ - + ]: 30 : if (!(*p)->is_active())
223 : 0 : continue;
224 : 30 : finish_contexts(g_ceph_context, (*p)->waiting_for_active);
225 : : }
226 [ + + ]: 35 : for (list<Paxos*>::iterator p = paxos.begin(); p != paxos.end(); p++) {
227 [ + + ]: 30 : if (!(*p)->is_active())
228 : 8 : continue;
229 : 22 : finish_contexts(g_ceph_context, (*p)->waiting_for_commit);
230 : : }
231 [ + + ]: 35 : for (list<Paxos*>::iterator p = paxos.begin(); p != paxos.end(); p++) {
232 [ + + ]: 30 : if (!(*p)->is_readable())
233 : 6 : continue;
234 : 24 : finish_contexts(g_ceph_context, (*p)->waiting_for_readable);
235 : : }
236 [ + + ]: 35 : for (list<Paxos*>::iterator p = paxos.begin(); p != paxos.end(); p++) {
237 [ + + ]: 30 : if (!(*p)->is_writeable())
238 : 10 : continue;
239 : 20 : finish_contexts(g_ceph_context, (*p)->waiting_for_writeable);
240 : : }
241 : : }
242 : 30 : }
243 : :
244 : 78 : void Monitor::recovered_peon(int id)
245 : : {
246 : : // unlike recovered_leader(), recovered_peon() can get called
247 : : // multiple times, because it is triggered by a paxos lease message,
248 : : // and the leader may send multiples of those out for a given paxos
249 : : // machine while it is waiting for another instance to recover.
250 [ + + ]: 78 : if (paxos_recovered.count(id))
251 : 78 : return;
252 [ - + ][ # # ]: 60 : dout(10) << "recovered_peon " << id << " " << get_paxos_name(id) << " (" << paxos_recovered << ")" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
253 : 60 : paxos_recovered.insert(id);
254 [ + + ]: 60 : if (paxos_recovered.size() == paxos.size()) {
255 [ - + ][ # # ]: 10 : dout(10) << "all paxos instances recovered/leased" << dendl;
[ # # ][ # # ]
256 : :
257 [ # # ][ - + ]: 20 : if (!features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_GV) &&
[ - + ]
258 : : (quorum_features & CEPH_FEATURE_MON_GV)) {
259 : 0 : require_gv_ondisk();
260 : 0 : require_gv_onwire();
261 : : }
262 : : }
263 : : }
264 : :
265 : 0 : void Monitor::require_gv_ondisk()
266 : : {
267 [ # # ][ # # ]: 0 : dout(0) << "setting CEPH_MON_FEATURE_INCOMPAT_GV" << dendl;
[ # # ][ # # ]
268 [ # # ]: 0 : features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_GV);
269 : 0 : write_features();
270 : 0 : }
271 : :
272 : 3 : void Monitor::require_gv_onwire()
273 : : {
274 [ - + ][ # # ]: 3 : dout(10) << "require_gv_onwire" << dendl;
[ # # ][ # # ]
275 : : // require protocol feature bit of my peers
276 : 3 : Messenger::Policy p = messenger->get_policy(entity_name_t::TYPE_MON);
277 : 3 : p.features_required |= CEPH_FEATURE_MON_GV;
278 : 3 : messenger->set_policy(entity_name_t::TYPE_MON, p);
279 : 3 : }
280 : :
281 : 52 : version_t Monitor::get_global_paxos_version()
282 : : {
283 : : // this should only be called when paxos becomes writeable, which is
284 : : // *after* everything settles after an election.
285 [ - + ]: 52 : assert(is_all_paxos_recovered());
286 : :
287 [ - + ]: 52 : if ((quorum_features & CEPH_FEATURE_MON_GV) == 0) {
288 : : // do not sure issuing gv's until the entire quorum supports them.
289 : : // this way we synchronize the setting of the incompat GV ondisk
290 : : // feature with actually writing the values to the data store, and
291 : : // avoid having to worry about hybrid cases.
292 [ # # ][ # # ]: 0 : dout(10) << "get_global_paxos_version no-op; quorum does not support the feature" << dendl;
[ # # ][ # # ]
293 : : return 0;
294 : : }
295 : :
296 [ + + ]: 52 : if (global_version == 0) {
297 : : global_version =
298 : 10 : osdmon()->paxos->get_version() +
299 : 5 : mdsmon()->paxos->get_version() +
300 : 5 : monmon()->paxos->get_version() +
301 : 5 : pgmon()->paxos->get_version() +
302 : 5 : authmon()->paxos->get_version() +
303 : 30 : logmon()->paxos->get_version();
304 [ - + ][ # # ]: 5 : dout(10) << "get_global_paxos_version first call this election epoch, starting from " << global_version << dendl;
[ # # ][ # # ]
305 : : }
306 : 52 : ++global_version;
307 [ - + ][ # # ]: 52 : dout(20) << "get_global_paxos_version " << global_version << dendl;
[ # # ][ # # ]
308 : 52 : return global_version;
309 : : }
310 : :
311 : : enum {
312 : : l_mon_first = 456000,
313 : : l_mon_last,
314 : : };
315 : :
316 : :
317 : 3 : class AdminHook : public AdminSocketHook {
318 : : Monitor *mon;
319 : : public:
320 : 6 : AdminHook(Monitor *m) : mon(m) {}
321 : 0 : bool call(std::string command, std::string args, bufferlist& out) {
322 : 0 : stringstream ss;
323 [ # # ][ # # ]: 0 : mon->do_admin_command(command, args, ss);
[ # # ]
324 [ # # ]: 0 : out.append(ss);
325 : 0 : return true;
326 : : }
327 : : };
328 : :
329 : 0 : void Monitor::do_admin_command(string command, string args, ostream& ss)
330 : : {
331 : 0 : Mutex::Locker l(lock);
332 [ # # ][ # # ]: 0 : if (command == "mon_status")
333 [ # # ]: 0 : _mon_status(ss);
334 [ # # ][ # # ]: 0 : else if (command == "quorum_status")
335 [ # # ]: 0 : _quorum_status(ss);
336 [ # # ][ # # ]: 0 : else if (command.find("add_bootstrap_peer_hint") == 0)
337 [ # # ][ # # ]: 0 : _add_bootstrap_peer_hint(command, args, ss);
[ # # ]
338 : : else
339 : 0 : assert(0 == "bad AdminSocket command binding");
340 : 0 : }
341 : :
342 : 3 : void Monitor::handle_signal(int signum)
343 : : {
344 [ - + ]: 3 : assert(signum == SIGINT || signum == SIGTERM);
345 [ + - ][ + - ]: 6 : derr << "*** Got Signal " << sys_siglist[signum] << " ***" << dendl;
[ + - ][ + - ]
[ + - ][ + - ]
346 : 3 : shutdown();
347 : 3 : }
348 : :
349 : 6 : CompatSet Monitor::get_supported_features()
350 : : {
351 : : CompatSet::FeatureSet ceph_mon_feature_compat;
352 : : CompatSet::FeatureSet ceph_mon_feature_ro_compat;
353 : : CompatSet::FeatureSet ceph_mon_feature_incompat;
354 [ + - ]: 6 : ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
355 [ + - ]: 6 : ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_GV);
356 : : return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
357 [ + - ]: 6 : ceph_mon_feature_incompat);
358 : : }
359 : :
360 : 0 : CompatSet Monitor::get_legacy_features()
361 : : {
362 : : CompatSet::FeatureSet ceph_mon_feature_compat;
363 : : CompatSet::FeatureSet ceph_mon_feature_ro_compat;
364 : : CompatSet::FeatureSet ceph_mon_feature_incompat;
365 [ # # ]: 0 : ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
366 : : return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
367 [ # # ]: 0 : ceph_mon_feature_incompat);
368 : : }
369 : :
370 : 3 : int Monitor::check_features(MonitorStore *store)
371 : : {
372 : 3 : CompatSet required = get_supported_features();
373 : 3 : CompatSet ondisk;
374 : :
375 [ + - ]: 3 : bufferlist features;
376 [ + - ]: 3 : store->get_bl_ss_safe(features, COMPAT_SET_LOC, 0);
377 [ - + ]: 3 : if (features.length() == 0) {
378 [ # # ][ # # ]: 0 : generic_dout(0) << "WARNING: mon fs missing feature list.\n"
[ # # ][ # # ]
[ # # ]
379 [ # # ][ # # ]: 0 : << "Assuming it is old-style and introducing one." << dendl;
[ # # ]
380 : : //we only want the baseline ~v.18 features assumed to be on disk.
381 : : //If new features are introduced this code needs to disappear or
382 : : //be made smarter.
383 [ # # ][ # # ]: 0 : ondisk = get_legacy_features();
384 : :
385 [ # # ]: 0 : bufferlist bl;
386 [ # # ]: 0 : ondisk.encode(bl);
387 [ # # ][ # # ]: 0 : store->put_bl_ss(bl, COMPAT_SET_LOC, 0);
388 : : } else {
389 [ + - ]: 3 : bufferlist::iterator it = features.begin();
390 [ + - ]: 3 : ondisk.decode(it);
391 : : }
392 : :
393 [ - + ]: 3 : if (!required.writeable(ondisk)) {
394 [ # # ]: 0 : CompatSet diff = required.unsupported(ondisk);
395 [ # # ][ # # ]: 0 : generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
396 : 0 : return -EPERM;
397 : : }
398 : :
399 [ + - ]: 3 : return 0;
400 : : }
401 : :
402 : 3 : void Monitor::read_features()
403 : : {
404 : 3 : bufferlist bl;
405 [ + - ]: 3 : store->get_bl_ss_safe(bl, COMPAT_SET_LOC, 0);
406 [ - + ]: 3 : assert(bl.length());
407 : :
408 [ + - ]: 3 : bufferlist::iterator p = bl.begin();
409 : 3 : ::decode(features, p);
410 [ + - ][ - + ]: 3 : dout(10) << "features " << features << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
411 : :
412 [ + - ]: 6 : if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_GV))
413 [ + - ]: 3 : require_gv_onwire();
414 : 3 : }
415 : :
416 : 3 : void Monitor::write_features()
417 : : {
418 : 3 : bufferlist bl;
419 [ + - ]: 3 : features.encode(bl);
420 [ + - ]: 3 : store->put_bl_ss(bl, COMPAT_SET_LOC, 0);
421 : 3 : }
422 : :
423 : 3 : int Monitor::preinit()
424 : : {
425 : 3 : lock.Lock();
426 : :
427 [ + - ][ + - ]: 6 : dout(1) << "preinit fsid " << monmap->fsid << dendl;
[ + - ][ + - ]
[ + - ]
428 : :
429 [ - + ]: 3 : assert(!logger);
430 : : {
431 [ + - ]: 3 : PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
432 : : // ...
433 [ + - ]: 3 : logger = pcb.create_perf_counters();
434 [ + - ][ + - ]: 3 : cct->get_perfcounters_collection()->add(logger);
435 : : }
436 : :
437 [ - + ]: 3 : assert(!cluster_logger);
438 : : {
439 [ + - ]: 3 : PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
440 [ + - ]: 3 : pcb.add_u64(l_cluster_num_mon, "num_mon");
441 [ + - ]: 3 : pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum");
442 [ + - ]: 3 : pcb.add_u64(l_cluster_num_osd, "num_osd");
443 [ + - ]: 3 : pcb.add_u64(l_cluster_num_osd_up, "num_osd_up");
444 [ + - ]: 3 : pcb.add_u64(l_cluster_num_osd_in, "num_osd_in");
445 [ + - ]: 3 : pcb.add_u64(l_cluster_osd_epoch, "osd_epoch");
446 [ + - ]: 3 : pcb.add_u64(l_cluster_osd_kb, "osd_kb");
447 [ + - ]: 3 : pcb.add_u64(l_cluster_osd_kb_used, "osd_kb_used");
448 [ + - ]: 3 : pcb.add_u64(l_cluster_osd_kb_avail, "osd_kb_avail");
449 [ + - ]: 3 : pcb.add_u64(l_cluster_num_pool, "num_pool");
450 [ + - ]: 3 : pcb.add_u64(l_cluster_num_pg, "num_pg");
451 [ + - ]: 3 : pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean");
452 [ + - ]: 3 : pcb.add_u64(l_cluster_num_pg_active, "num_pg_active");
453 [ + - ]: 3 : pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering");
454 [ + - ]: 3 : pcb.add_u64(l_cluster_num_object, "num_object");
455 [ + - ]: 3 : pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded");
456 [ + - ]: 3 : pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound");
457 [ + - ]: 3 : pcb.add_u64(l_cluster_num_bytes, "num_bytes");
458 [ + - ]: 3 : pcb.add_u64(l_cluster_num_mds_up, "num_mds_up");
459 [ + - ]: 3 : pcb.add_u64(l_cluster_num_mds_in, "num_mds_in");
460 [ + - ]: 3 : pcb.add_u64(l_cluster_num_mds_failed, "num_mds_failed");
461 [ + - ]: 3 : pcb.add_u64(l_cluster_mds_epoch, "mds_epoch");
462 [ + - ]: 3 : cluster_logger = pcb.create_perf_counters();
463 : : }
464 : :
465 : : // verify cluster_uuid
466 : : {
467 : 3 : int r = check_fsid();
468 [ - + ]: 3 : if (r == -ENOENT)
469 : 0 : r = write_fsid();
470 [ - + ]: 3 : if (r < 0) {
471 : 0 : lock.Unlock();
472 : 0 : return r;
473 : : }
474 : : }
475 : :
476 : : // open compatset
477 : 3 : read_features();
478 : :
479 : : // have we ever joined a quorum?
480 : 3 : has_ever_joined = store->exists_bl_ss("joined");
481 [ # # ][ # # ]: 3 : dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
[ # # ][ # # ]
[ - + ]
482 : :
483 [ + - ]: 3 : if (!has_ever_joined) {
484 : : // impose initial quorum restrictions?
485 : : list<string> initial_members;
486 [ + - ]: 3 : get_str_list(g_conf->mon_initial_members, initial_members);
487 : :
488 [ - + ]: 3 : if (initial_members.size()) {
489 [ # # ][ # # ]: 0 : dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
490 : :
491 : 0 : monmap->set_initial_members(g_ceph_context, initial_members, name, messenger->get_myaddr(),
492 [ # # ][ # # ]: 0 : &extra_probe_peers);
493 : :
494 [ # # ][ # # ]: 0 : dout(10) << " monmap is " << *monmap << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
495 : : }
496 : : }
497 : :
498 : : // init paxos
499 [ + + ]: 21 : for (list<Paxos*>::iterator it = paxos.begin(); it != paxos.end(); ++it) {
500 : 18 : (*it)->init();
501 [ + - ]: 18 : if ((*it)->is_consistent()) {
502 : 18 : int i = (*it)->machine_id;
503 : 18 : paxos_service[i]->update_from_paxos();
504 : : } // else we don't do anything; handle_probe_reply will detect it's slurping
505 : : }
506 : :
507 : : // we need to bootstrap authentication keys so we can form an
508 : : // initial quorum.
509 [ + - ]: 3 : if (authmon()->paxos->get_version() == 0) {
510 [ - + ][ # # ]: 3 : dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
[ # # ][ # # ]
511 : 3 : bufferlist bl;
512 [ + - ]: 3 : store->get_bl_ss_safe(bl, "mkfs", "keyring");
513 : : KeyRing keyring;
514 [ + - ]: 3 : bufferlist::iterator p = bl.begin();
515 : : ::decode(keyring, p);
516 [ + - ][ + - ]: 3 : extract_save_mon_key(keyring);
517 : : }
518 : :
519 : 3 : ostringstream os;
520 [ + - ][ + - ]: 3 : os << g_conf->mon_data << "/keyring";
521 [ + - ]: 3 : int r = keyring.load(cct, os.str());
522 [ - + ]: 3 : if (r < 0) {
523 [ # # ]: 0 : EntityName mon_name;
524 [ # # ]: 0 : mon_name.set_type(CEPH_ENTITY_TYPE_MON);
525 : : EntityAuth mon_key;
526 [ # # ][ # # ]: 0 : if (key_server.get_auth(mon_name, mon_key)) {
527 [ # # ][ # # ]: 0 : dout(1) << "copying mon. key from old db to external keyring" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
528 [ # # ]: 0 : keyring.add(mon_name, mon_key);
529 [ # # ]: 0 : bufferlist bl;
530 [ # # ]: 0 : keyring.encode_plaintext(bl);
531 [ # # ][ # # ]: 0 : store->put_bl_ss(bl, "keyring", NULL);
532 : : } else {
533 [ # # ][ # # ]: 0 : derr << "unable to load initial keyring " << g_conf->keyring << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
534 [ # # ]: 0 : lock.Unlock();
535 : : return r;
536 [ # # ][ # # ]: 0 : }
537 : : }
538 : :
539 [ + - ]: 3 : admin_hook = new AdminHook(this);
540 [ + - ]: 3 : AdminSocket* admin_socket = cct->get_admin_socket();
541 : :
542 : : // unlock while registering to avoid mon_lock -> admin socket lock dependency.
543 [ + - ]: 3 : lock.Unlock();
544 : : r = admin_socket->register_command("mon_status", admin_hook,
545 [ + - ][ + - ]: 3 : "show current monitor status");
[ + - ]
546 [ - + ]: 3 : assert(r == 0);
547 : : r = admin_socket->register_command("quorum_status", admin_hook,
548 [ + - ][ + - ]: 3 : "show current quorum status");
[ + - ]
549 [ - + ]: 3 : assert(r == 0);
550 : : r = admin_socket->register_command("add_bootstrap_peer_hint", admin_hook,
551 [ + - ][ + - ]: 3 : "add peer address as potential bootstrap peer for cluster bringup");
[ + - ]
552 [ - + ]: 3 : assert(r == 0);
553 [ + - ]: 3 : lock.Lock();
554 : :
555 [ + - ]: 3 : lock.Unlock();
556 : 3 : return 0;
557 : : }
558 : :
559 : 3 : int Monitor::init()
560 : : {
561 [ + - ][ + - ]: 6 : dout(2) << "init" << dendl;
[ + - ][ + - ]
562 : 3 : lock.Lock();
563 : :
564 : : // start ticker
565 : 3 : timer.init();
566 : 3 : new_tick();
567 : :
568 : : // i'm ready!
569 : 3 : messenger->add_dispatcher_tail(this);
570 : :
571 : 3 : bootstrap();
572 : :
573 : 3 : lock.Unlock();
574 : 3 : return 0;
575 : : }
576 : :
577 : 17 : void Monitor::register_cluster_logger()
578 : : {
579 [ + + ]: 17 : if (!cluster_logger_registered) {
580 [ - + ][ # # ]: 12 : dout(10) << "register_cluster_logger" << dendl;
[ # # ][ # # ]
581 : 12 : cluster_logger_registered = true;
582 : 12 : cct->get_perfcounters_collection()->add(cluster_logger);
583 : : } else {
584 [ - + ][ # # ]: 5 : dout(10) << "register_cluster_logger - already registered" << dendl;
[ # # ][ # # ]
585 : : }
586 : 17 : }
587 : :
588 : 15 : void Monitor::unregister_cluster_logger()
589 : : {
590 [ + + ]: 15 : if (cluster_logger_registered) {
591 [ - + ][ # # ]: 9 : dout(10) << "unregister_cluster_logger" << dendl;
[ # # ][ # # ]
592 : 9 : cluster_logger_registered = false;
593 : 9 : cct->get_perfcounters_collection()->remove(cluster_logger);
594 : : } else {
595 [ - + ][ # # ]: 6 : dout(10) << "unregister_cluster_logger - not registered" << dendl;
[ # # ][ # # ]
596 : : }
597 : 15 : }
598 : :
599 : 17 : void Monitor::update_logger()
600 : : {
601 : 17 : cluster_logger->set(l_cluster_num_mon, monmap->size());
602 : 17 : cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
603 : 17 : }
604 : :
605 : 3 : void Monitor::shutdown()
606 : : {
607 [ + - ][ + - ]: 6 : dout(1) << "shutdown" << dendl;
[ + - ][ + - ]
608 : 3 : lock.Lock();
609 : :
610 : 3 : state = STATE_SHUTDOWN;
611 : :
612 [ + - ]: 3 : if (admin_hook) {
613 : 3 : AdminSocket* admin_socket = cct->get_admin_socket();
614 [ + - ]: 3 : admin_socket->unregister_command("mon_status");
615 [ + - ]: 3 : admin_socket->unregister_command("quorum_status");
616 [ + - ]: 3 : delete admin_hook;
617 : 3 : admin_hook = NULL;
618 : : }
619 : :
620 : 3 : elector.shutdown();
621 : :
622 [ + - ]: 3 : if (logger) {
623 : 3 : cct->get_perfcounters_collection()->remove(logger);
624 [ + - ]: 3 : delete logger;
625 : 3 : logger = NULL;
626 : : }
627 [ + - ]: 3 : if (cluster_logger) {
628 [ + - ]: 3 : if (cluster_logger_registered)
629 : 3 : cct->get_perfcounters_collection()->remove(cluster_logger);
630 [ + - ]: 3 : delete cluster_logger;
631 : 3 : cluster_logger = NULL;
632 : : }
633 : :
634 : : // clean up
635 [ + + ]: 21 : for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++)
636 : 18 : (*p)->shutdown();
637 : :
638 : 3 : finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
639 : 3 : finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
640 : :
641 : :
642 : 3 : timer.shutdown();
643 : :
644 : : // unlock before msgr shutdown...
645 : 3 : lock.Unlock();
646 : :
647 : 3 : remove_all_sessions();
648 : 3 : messenger->shutdown(); // last thing! ceph_mon.cc will delete mon.
649 : 3 : }
650 : :
651 : 15 : void Monitor::bootstrap()
652 : : {
653 [ - + ][ # # ]: 15 : dout(10) << "bootstrap" << dendl;
[ # # ][ # # ]
654 : :
655 : 15 : unregister_cluster_logger();
656 : 15 : cancel_probe_timeout();
657 : :
658 : : // note my rank
659 : 30 : int newrank = monmap->get_rank(messenger->get_myaddr());
660 [ - + ][ # # ]: 15 : if (newrank < 0 && rank >= 0) {
661 : : // was i ever part of the quorum?
662 [ # # ]: 0 : if (has_ever_joined) {
663 [ # # ][ # # ]: 0 : dout(0) << " removed from monmap, suicide." << dendl;
[ # # ][ # # ]
664 : 0 : exit(0);
665 : : }
666 : : }
667 [ + + ]: 15 : if (newrank != rank) {
668 [ + - ][ + - ]: 6 : dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ][ + - ]
669 : 3 : messenger->set_myname(entity_name_t::MON(newrank));
670 : 3 : rank = newrank;
671 : :
672 : : // reset all connections, or else our peers will think we are someone else.
673 : 3 : messenger->mark_down_all();
674 : : }
675 : :
676 : : // reset
677 : 15 : state = STATE_PROBING;
678 : :
679 : 15 : reset();
680 : :
681 : : // singleton monitor?
682 [ # # ][ - + ]: 15 : if (monmap->size() == 1 && rank == 0) {
[ - + ]
683 : 0 : win_standalone_election();
684 : 15 : return;
685 : : }
686 : :
687 : 15 : reset_probe_timeout();
688 : :
689 : : // i'm outside the quorum
690 [ + - ]: 15 : if (monmap->contains(name))
691 : 15 : outside_quorum.insert(name);
692 : :
693 : : // probe monitors
694 [ - + ][ # # ]: 15 : dout(10) << "probing other monitors" << dendl;
[ # # ][ # # ]
695 [ + + ]: 60 : for (unsigned i = 0; i < monmap->size(); i++) {
696 [ + + ]: 45 : if ((int)i != rank)
697 [ + - ]: 30 : messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
698 : 60 : monmap->get_inst(i));
699 : : }
700 [ - + ]: 15 : for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
701 : 30 : p != extra_probe_peers.end();
702 : : ++p) {
703 [ # # ]: 0 : if (*p != messenger->get_myaddr()) {
704 : 0 : entity_inst_t i;
705 : 0 : i.name = entity_name_t::MON(-1);
706 : 0 : i.addr = *p;
707 [ # # ]: 0 : messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
708 : : }
709 : : }
710 : : }
711 : :
712 : 0 : void Monitor::_add_bootstrap_peer_hint(string cmd, string args, ostream& ss)
713 : : {
714 [ # # ][ # # ]: 0 : dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '" << args << "'" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
715 : :
716 : 0 : entity_addr_t addr;
717 : 0 : const char *end = 0;
718 [ # # ]: 0 : if (!addr.parse(args.c_str(), &end)) {
719 : 0 : ss << "failed to parse addr '" << args << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
720 : 0 : return;
721 : : }
722 : :
723 [ # # ][ # # ]: 0 : if (is_leader() || is_peon()) {
[ # # ]
724 : 0 : ss << "mon already active; ignoring bootstrap hint";
725 : 0 : return;
726 : : }
727 : :
728 [ # # ]: 0 : if (addr.get_port() == 0)
729 : 0 : addr.set_port(CEPH_MON_PORT);
730 : :
731 : 0 : extra_probe_peers.insert(addr);
732 : 0 : ss << "adding peer " << addr << " to list: " << extra_probe_peers;
733 : : }
734 : :
735 : : // called by bootstrap(), or on leader|peon -> electing
736 : 21 : void Monitor::reset()
737 : : {
738 [ - + ][ # # ]: 21 : dout(10) << "reset" << dendl;
[ # # ][ # # ]
739 : :
740 : 21 : timecheck_cleanup();
741 : :
742 : 21 : leader_since = utime_t();
743 [ + + ]: 21 : if (!quorum.empty()) {
744 : 14 : exited_quorum = ceph_clock_now(g_ceph_context);
745 : : }
746 : 21 : quorum.clear();
747 : 21 : outside_quorum.clear();
748 : :
749 : 21 : paxos_recovered.clear();
750 : 21 : global_version = 0;
751 : :
752 [ + + ]: 147 : for (list<Paxos*>::iterator p = paxos.begin(); p != paxos.end(); p++)
753 : 126 : (*p)->restart();
754 [ + + ]: 147 : for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++)
755 : 126 : (*p)->restart();
756 : 21 : }
757 : :
758 : 49 : void Monitor::cancel_probe_timeout()
759 : : {
760 [ + + ]: 49 : if (probe_timeout_event) {
761 [ - + ][ # # ]: 14 : dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
[ # # ][ # # ]
762 : 14 : timer.cancel_event(probe_timeout_event);
763 : 14 : probe_timeout_event = NULL;
764 : : } else {
765 [ - + ][ # # ]: 35 : dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
[ # # ][ # # ]
766 : : }
767 : 49 : }
768 : :
769 : 15 : void Monitor::reset_probe_timeout()
770 : : {
771 : 15 : cancel_probe_timeout();
772 : 15 : probe_timeout_event = new C_ProbeTimeout(this);
773 [ + - ]: 15 : double t = is_probing() ? g_conf->mon_probe_timeout : g_conf->mon_slurp_timeout;
774 : 15 : timer.add_event_after(t, probe_timeout_event);
775 [ # # ][ # # ]: 15 : dout(10) << "reset_probe_timeout " << probe_timeout_event << " after " << t << " seconds" << dendl;
[ # # ][ # # ]
[ # # ][ - + ]
776 : 15 : }
777 : :
778 : 1 : void Monitor::probe_timeout(int r)
779 : : {
780 [ + - ][ + - ]: 2 : dout(4) << "probe_timeout " << probe_timeout_event << dendl;
[ + - ][ + - ]
781 [ - + ][ # # ]: 1 : assert(is_probing() || is_slurping());
782 [ - + ]: 1 : assert(probe_timeout_event);
783 : 1 : probe_timeout_event = NULL;
784 : 1 : bootstrap();
785 : 1 : }
786 : :
787 : 60 : void Monitor::handle_probe(MMonProbe *m)
788 : : {
789 [ - + ][ # # ]: 60 : dout(10) << "handle_probe " << *m << dendl;
[ # # ][ # # ]
[ # # ]
790 : :
791 [ - + ]: 60 : if (m->fsid != monmap->fsid) {
792 [ # # ][ # # ]: 0 : dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
793 : 0 : m->put();
794 : 60 : return;
795 : : }
796 : :
797 [ + + - - : 60 : switch (m->op) {
- - ]
798 : : case MMonProbe::OP_PROBE:
799 : 30 : handle_probe_probe(m);
800 : 30 : break;
801 : :
802 : : case MMonProbe::OP_REPLY:
803 : 30 : handle_probe_reply(m);
804 : 30 : break;
805 : :
806 : : case MMonProbe::OP_SLURP:
807 : 0 : handle_probe_slurp(m);
808 : 0 : break;
809 : :
810 : : case MMonProbe::OP_SLURP_LATEST:
811 : 0 : handle_probe_slurp_latest(m);
812 : 0 : break;
813 : :
814 : : case MMonProbe::OP_DATA:
815 : 0 : handle_probe_data(m);
816 : 0 : break;
817 : :
818 : : default:
819 : 0 : m->put();
820 : : }
821 : : }
822 : :
823 : 30 : void Monitor::handle_probe_probe(MMonProbe *m)
824 : : {
825 [ - + ][ # # ]: 30 : dout(10) << "handle_probe_probe " << m->get_source_inst() << *m << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
826 [ + - ]: 30 : MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
827 : 30 : r->name = name;
828 : 30 : r->quorum = quorum;
829 : 30 : monmap->encode(r->monmap_bl, m->get_connection()->get_features());
830 [ + + ]: 210 : for (list<Paxos*>::iterator p = paxos.begin(); p != paxos.end(); ++p)
831 [ + - ]: 180 : r->paxos_versions[(*p)->get_machine_name()] = (*p)->get_version();
832 : 30 : messenger->send_message(r, m->get_connection());
833 : :
834 : : // did we discover a peer here?
835 [ - + ]: 60 : if (!monmap->contains(m->get_source_addr())) {
836 [ # # ][ # # ]: 0 : dout(1) << " adding peer " << m->get_source_addr() << " to list of hints" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
837 : 0 : extra_probe_peers.insert(m->get_source_addr());
838 : : }
839 : :
840 : 30 : m->put();
841 : 30 : }
842 : :
843 : 30 : void Monitor::handle_probe_reply(MMonProbe *m)
844 : : {
845 [ - + ][ # # ]: 30 : dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
846 [ - + ][ # # ]: 30 : dout(10) << " monmap is " << *monmap << dendl;
[ # # ][ # # ]
847 : :
848 [ + + ]: 30 : if (!is_probing()) {
849 : 16 : m->put();
850 : : return;
851 : : }
852 : :
853 : : // newer map, or they've joined a quorum and we haven't?
854 : 14 : bufferlist mybl;
855 [ + - ]: 14 : monmap->encode(mybl, m->get_connection()->get_features());
856 : : // make sure it's actually different; the checks below err toward
857 : : // taking the other guy's map, which could cause us to loop.
858 [ + - ][ + + ]: 14 : if (!mybl.contents_equal(m->monmap_bl)) {
859 [ + - ]: 4 : MonMap *newmap = new MonMap;
860 [ + - ]: 4 : newmap->decode(m->monmap_bl);
861 [ + + ][ - + ]: 4 : if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
[ # # ][ + + ]
862 : 0 : !has_ever_joined)) {
863 [ + - ][ - + ]: 1 : dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
[ # # ][ # # ]
[ # # ][ # # ]
864 [ # # ][ # # ]: 0 : << ", mine was " << monmap->get_epoch() << dendl;
[ # # ]
865 [ + - ]: 1 : delete newmap;
866 [ + - ]: 1 : monmap->decode(m->monmap_bl);
867 : 1 : m->put();
868 : :
869 [ + - ]: 1 : bootstrap();
870 : : return;
871 : : }
872 [ + - ]: 3 : delete newmap;
873 : : }
874 : :
875 : : // rename peer?
876 [ + - ]: 13 : string peer_name = monmap->get_name(m->get_source_addr());
877 [ + + ][ + - ]: 13 : if (monmap->get_epoch() == 0 && peer_name.find("noname-") == 0) {
[ + - ][ - + ]
878 [ # # ][ # # ]: 0 : dout(10) << " renaming peer " << m->get_source_addr() << " "
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
879 [ # # ][ # # ]: 0 : << peer_name << " -> " << m->name << " in my monmap"
[ # # ][ # # ]
880 [ # # ][ # # ]: 0 : << dendl;
881 [ # # ][ # # ]: 0 : monmap->rename(peer_name, m->name);
[ # # ]
882 : : } else {
883 [ + - ][ - + ]: 13 : dout(10) << " peer name is " << peer_name << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
884 : : }
885 : :
886 : : // new initial peer?
887 [ + - ]: 13 : if (monmap->contains(m->name)) {
888 [ + - ][ - + ]: 13 : if (monmap->get_addr(m->name).is_blank_ip()) {
889 [ # # ][ # # ]: 0 : dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
890 [ # # ]: 0 : monmap->set_addr(m->name, m->get_source_addr());
891 : 0 : m->put();
892 : :
893 [ # # ]: 0 : bootstrap();
894 : : return;
895 : : }
896 : : }
897 : :
898 : : // is there an existing quorum?
899 [ + + ]: 13 : if (m->quorum.size()) {
900 [ + - ][ - + ]: 2 : dout(10) << " existing quorum " << m->quorum << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
901 : :
902 : : // do i need to catch up?
903 : 2 : bool ok = true;
904 [ + + ]: 14 : for (map<string,version_t>::iterator p = m->paxos_versions.begin();
905 : 28 : p != m->paxos_versions.end();
906 : : ++p) {
907 [ + - ]: 12 : Paxos *pax = get_paxos_by_name(p->first);
908 [ - + ]: 12 : if (!pax) {
909 [ # # ][ # # ]: 0 : dout(0) << " peer has paxos machine " << p->first << " but i don't... weird" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
910 : 0 : continue; // weird!
911 : : }
912 [ - + ]: 12 : if (pax->is_slurping()) {
913 [ # # ][ # # ]: 0 : dout(10) << " My paxos machine " << p->first
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
914 [ # # ]: 0 : << " is currently slurping, so that will continue. Peer has v "
915 [ # # ][ # # ]: 0 : << p->second << dendl;
916 : : ok = false;
917 [ - + ]: 12 : } else if (pax->get_version() + g_conf->paxos_max_join_drift < p->second) {
918 [ # # ][ # # ]: 0 : dout(10) << " peer paxos machine " << p->first << " v " << p->second
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
919 [ # # ]: 0 : << " vs my v " << pax->get_version()
920 [ # # ]: 0 : << " (too far ahead)"
921 [ # # ][ # # ]: 0 : << dendl;
922 : : ok = false;
923 : : } else {
924 [ + - ][ - + ]: 12 : dout(10) << " peer paxos machine " << p->first << " v " << p->second
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
925 [ # # ]: 0 : << " vs my v " << pax->get_version()
926 [ # # ]: 0 : << " (ok)"
927 [ # # ][ # # ]: 0 : << dendl;
928 : : }
929 : : }
930 [ + - ]: 2 : if (ok) {
931 [ + - ]: 4 : if (monmap->contains(name) &&
[ + - - + ]
932 [ + - ]: 2 : !monmap->get_addr(name).is_blank_ip()) {
933 : : // i'm part of the cluster; just initiate a new election
934 [ + - ]: 2 : start_election();
935 : : } else {
936 [ # # ][ # # ]: 0 : dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
937 [ # # ]: 0 : messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
938 [ # # ][ # # ]: 0 : monmap->get_inst(*m->quorum.begin()));
[ # # ][ # # ]
939 : : }
940 : : } else {
941 : 0 : slurp_source = m->get_source_inst();
942 : 0 : slurp_versions = m->paxos_versions;
943 [ # # ]: 0 : slurp();
944 : : }
945 : : } else {
946 : : // not part of a quorum
947 [ + - ]: 11 : if (monmap->contains(m->name))
948 [ + - ]: 11 : outside_quorum.insert(m->name);
949 : : else
950 [ # # ][ # # ]: 0 : dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
951 : :
952 : 22 : unsigned need = monmap->size() / 2 + 1;
953 [ + - ][ - + ]: 11 : dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
954 : :
955 [ + - ]: 11 : if (outside_quorum.size() >= need) {
956 [ + - ]: 11 : if (outside_quorum.count(name)) {
957 [ + - ][ - + ]: 11 : dout(10) << " that's enough to form a new quorum, calling election" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
958 [ + - ]: 11 : start_election();
959 : : } else {
960 [ # # ][ # # ]: 0 : dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
961 : : }
962 : : } else {
963 [ # # ][ # # ]: 0 : dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
964 : : }
965 : : }
966 : :
967 : 30 : m->put();
968 : : }
969 : :
970 : : /*
971 : : * The whole slurp process is currently a bit of a hack. Given the
972 : : * current storage model, we should be sharing code with Paxos to make
973 : : * sure we copy the right content. But that model sucks and will
974 : : * hopefully soon change, and it's less work to kludge around it here
975 : : * than it is to make the current model clean.
976 : : *
977 : : * So: more or less duplicate the work of resyncing each paxos state
978 : : * machine here. And move the monitor storage refactor stuff up the
979 : : * todo list.
980 : : *
981 : : */
982 : :
983 : 0 : void Monitor::slurp()
984 : : {
985 [ # # ][ # # ]: 0 : dout(10) << "slurp " << slurp_source << " " << slurp_versions << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
986 : :
987 : 0 : reset_probe_timeout();
988 : :
989 : 0 : state = STATE_SLURPING;
990 : :
991 : 0 : map<string,version_t>::iterator p = slurp_versions.begin();
992 [ # # ]: 0 : while (p != slurp_versions.end()) {
993 : 0 : Paxos *pax = get_paxos_by_name(p->first);
994 [ # # ]: 0 : if (!pax) {
995 : : p++;
996 : 0 : continue;
997 : : }
998 : :
999 [ # # ][ # # ]: 0 : dout(10) << " " << p->first << " v " << p->second << " vs my " << pax->get_version() << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
1000 [ # # ][ # # ]: 0 : if (p->second > pax->get_version() ||
[ # # ]
1001 : : pax->get_stashed_version() > pax->get_version()) {
1002 [ # # ]: 0 : if (!pax->is_slurping()) {
1003 : 0 : pax->start_slurping();
1004 : : }
1005 [ # # ]: 0 : MMonProbe *m = new MMonProbe(monmap->fsid, MMonProbe::OP_SLURP, name, has_ever_joined);
1006 : 0 : m->machine_name = p->first;
1007 : 0 : m->oldest_version = pax->get_first_committed();
1008 : 0 : m->newest_version = pax->get_version();
1009 : 0 : messenger->send_message(m, slurp_source);
1010 : 0 : return;
1011 : : }
1012 : :
1013 : : // latest?
1014 [ # # ][ # # ]: 0 : if (pax->get_first_committed() > 1 && // don't need it!
[ # # ]
1015 : : pax->get_stashed_version() < pax->get_first_committed()) {
1016 [ # # ]: 0 : if (!pax->is_slurping()) {
1017 : 0 : pax->start_slurping();
1018 : : }
1019 [ # # ]: 0 : MMonProbe *m = new MMonProbe(monmap->fsid, MMonProbe::OP_SLURP_LATEST, name, has_ever_joined);
1020 : 0 : m->machine_name = p->first;
1021 : 0 : m->oldest_version = pax->get_first_committed();
1022 : 0 : m->newest_version = pax->get_version();
1023 : 0 : messenger->send_message(m, slurp_source);
1024 : 0 : return;
1025 : : }
1026 : :
1027 : 0 : PaxosService *paxs = get_paxos_service_by_name(p->first);
1028 [ # # ]: 0 : assert(paxs);
1029 : 0 : paxs->update_from_paxos();
1030 : :
1031 : 0 : pax->end_slurping();
1032 : :
1033 : 0 : slurp_versions.erase(p++);
1034 : : }
1035 : :
1036 [ # # ][ # # ]: 0 : dout(10) << "done slurping" << dendl;
[ # # ][ # # ]
1037 : 0 : bootstrap();
1038 : : }
1039 : :
1040 : 0 : MMonProbe *Monitor::fill_probe_data(MMonProbe *m, Paxos *pax)
1041 : : {
1042 [ # # ]: 0 : MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_DATA, name, has_ever_joined);
1043 : 0 : r->machine_name = m->machine_name;
1044 : 0 : r->oldest_version = pax->get_first_committed();
1045 : 0 : r->newest_version = pax->get_version();
1046 : :
1047 [ # # ]: 0 : version_t v = MAX(pax->get_first_committed(), m->newest_version + 1);
1048 : 0 : int len = 0;
1049 [ # # ]: 0 : for (; v <= pax->get_version(); v++) {
1050 : 0 : store->get_bl_sn_safe(r->paxos_values[m->machine_name][v], m->machine_name.c_str(), v);
1051 : 0 : len += r->paxos_values[m->machine_name][v].length();
1052 : 0 : r->gv[m->machine_name][v] = store->get_global_version(m->machine_name.c_str(), v);
1053 [ # # ]: 0 : for (list<string>::iterator p = pax->extra_state_dirs.begin();
1054 : 0 : p != pax->extra_state_dirs.end();
1055 : : ++p) {
1056 : 0 : store->get_bl_sn_safe(r->paxos_values[*p][v], p->c_str(), v);
1057 : 0 : len += r->paxos_values[*p][v].length();
1058 : : }
1059 [ # # ]: 0 : if (len >= g_conf->mon_slurp_bytes)
1060 : : break;
1061 : : }
1062 : :
1063 : 0 : return r;
1064 : : }
1065 : :
1066 : 0 : void Monitor::handle_probe_slurp(MMonProbe *m)
1067 : : {
1068 [ # # ][ # # ]: 0 : dout(10) << "handle_probe_slurp " << *m << dendl;
[ # # ][ # # ]
[ # # ]
1069 : :
1070 : 0 : Paxos *pax = get_paxos_by_name(m->machine_name);
1071 [ # # ]: 0 : assert(pax);
1072 : :
1073 : 0 : MMonProbe *r = fill_probe_data(m, pax);
1074 : 0 : messenger->send_message(r, m->get_connection());
1075 : 0 : m->put();
1076 : 0 : }
1077 : :
1078 : 0 : void Monitor::handle_probe_slurp_latest(MMonProbe *m)
1079 : : {
1080 [ # # ][ # # ]: 0 : dout(10) << "handle_probe_slurp_latest " << *m << dendl;
[ # # ][ # # ]
[ # # ]
1081 : :
1082 : 0 : Paxos *pax = get_paxos_by_name(m->machine_name);
1083 [ # # ]: 0 : assert(pax);
1084 : :
1085 : 0 : MMonProbe *r = fill_probe_data(m, pax);
1086 : 0 : r->latest_version = pax->get_stashed(r->latest_value);
1087 : :
1088 : 0 : messenger->send_message(r, m->get_connection());
1089 : 0 : m->put();
1090 : 0 : }
1091 : :
1092 : 0 : void Monitor::handle_probe_data(MMonProbe *m)
1093 : : {
1094 [ # # ][ # # ]: 0 : dout(10) << "handle_probe_data " << *m << dendl;
[ # # ][ # # ]
[ # # ]
1095 : :
1096 : 0 : Paxos *pax = get_paxos_by_name(m->machine_name);
1097 [ # # ]: 0 : assert(pax);
1098 : :
1099 : : // trim old cruft?
1100 [ # # ]: 0 : if (m->oldest_version > pax->get_first_committed())
1101 : 0 : pax->trim_to(m->oldest_version, true);
1102 : :
1103 : : // note new latest version?
1104 [ # # ]: 0 : if (slurp_versions.count(m->machine_name))
1105 : 0 : slurp_versions[m->machine_name] = m->newest_version;
1106 : :
1107 : : // store any new stuff
1108 [ # # ]: 0 : if (m->paxos_values.size()) {
1109 [ # # ]: 0 : for (map<string, map<version_t, bufferlist> >::iterator p = m->paxos_values.begin();
1110 : 0 : p != m->paxos_values.end();
1111 : : ++p) {
1112 : 0 : store->put_bl_sn_map(p->first.c_str(), p->second.begin(), p->second.end(), &m->gv[p->first]);
1113 : : }
1114 : :
1115 : 0 : pax->last_committed = m->paxos_values.begin()->second.rbegin()->first;
1116 : : store->put_int(pax->last_committed, m->machine_name.c_str(),
1117 : 0 : "last_committed");
1118 : : }
1119 : :
1120 : : // latest?
1121 [ # # ]: 0 : if (m->latest_version) {
1122 : 0 : pax->stash_latest(m->latest_version, m->latest_value);
1123 : : }
1124 : :
1125 : 0 : m->put();
1126 : :
1127 : 0 : slurp();
1128 : 0 : }
1129 : :
1130 : 19 : void Monitor::start_election()
1131 : : {
1132 [ - + ][ # # ]: 19 : dout(10) << "start_election" << dendl;
[ # # ][ # # ]
1133 : :
1134 : 19 : cancel_probe_timeout();
1135 : :
1136 : : // call a new election
1137 : 19 : state = STATE_ELECTING;
1138 [ + - ][ + - ]: 19 : clog.info() << "mon." << name << " calling new monitor election\n";
1139 : 19 : elector.call_election();
1140 : 19 : }
1141 : :
1142 : 0 : void Monitor::win_standalone_election()
1143 : : {
1144 [ # # ][ # # ]: 0 : dout(1) << "win_standalone_election" << dendl;
[ # # ][ # # ]
1145 : :
1146 : : // bump election epoch, in case the previous epoch included other
1147 : : // monitors; we need to be able to make the distinction.
1148 : 0 : elector.advance_epoch();
1149 : :
1150 : 0 : rank = monmap->get_rank(name);
1151 [ # # ]: 0 : assert(rank == 0);
1152 : : set<int> q;
1153 [ # # ]: 0 : q.insert(rank);
1154 [ # # ]: 0 : win_election(1, q, CEPH_FEATURES_ALL);
1155 : 0 : }
1156 : :
1157 : 3 : const utime_t& Monitor::get_leader_since() const
1158 : : {
1159 [ - + ]: 3 : assert(state == STATE_LEADER);
1160 : 3 : return leader_since;
1161 : : }
1162 : :
1163 : 948 : epoch_t Monitor::get_epoch()
1164 : : {
1165 : 948 : return elector.get_epoch();
1166 : : }
1167 : :
1168 : 6 : void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features)
1169 : : {
1170 [ - + ]: 6 : if (!is_electing())
1171 : 0 : reset();
1172 : :
1173 : 6 : state = STATE_LEADER;
1174 : 6 : leader_since = ceph_clock_now(g_ceph_context);
1175 : 6 : leader = rank;
1176 : 6 : quorum = active;
1177 : 6 : quorum_features = features;
1178 : 6 : outside_quorum.clear();
1179 [ # # ][ # # ]: 6 : dout(10) << "win_election, epoch " << epoch << " quorum is " << quorum
[ # # ][ # # ]
[ - + ]
1180 [ # # ]: 0 : << " features are " << quorum_features
1181 [ # # ]: 0 : << dendl;
1182 : :
1183 [ + - ][ + - ]: 12 : clog.info() << "mon." << name << "@" << rank
[ + - ]
1184 [ + - ][ + - ]: 6 : << " won leader election with quorum " << quorum << "\n";
[ + - ]
1185 : :
1186 [ + + ]: 42 : for (list<Paxos*>::iterator p = paxos.begin(); p != paxos.end(); p++)
1187 : 36 : (*p)->leader_init();
1188 [ + + ]: 42 : for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++)
1189 : 36 : (*p)->election_finished();
1190 : :
1191 : 6 : finish_election();
1192 [ + - ]: 6 : if (monmap->size() > 1)
1193 : 6 : timecheck();
1194 : 6 : }
1195 : :
1196 : 11 : void Monitor::lose_election(epoch_t epoch, set<int> &q, int l, uint64_t features)
1197 : : {
1198 : 11 : state = STATE_PEON;
1199 : 11 : leader_since = utime_t();
1200 : 11 : leader = l;
1201 : 11 : quorum = q;
1202 : 11 : outside_quorum.clear();
1203 : 11 : quorum_features = features;
1204 [ # # ][ # # ]: 11 : dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
[ # # ][ # # ]
[ - + ]
1205 [ # # ][ # # ]: 0 : << " quorum is " << quorum << " features are " << quorum_features << dendl;
[ # # ][ # # ]
1206 : :
1207 [ + + ]: 77 : for (list<Paxos*>::iterator p = paxos.begin(); p != paxos.end(); p++)
1208 : 66 : (*p)->peon_init();
1209 [ + + ]: 77 : for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++)
1210 : 66 : (*p)->election_finished();
1211 : :
1212 : 11 : finish_election();
1213 : 11 : }
1214 : :
1215 : 17 : void Monitor::finish_election()
1216 : : {
1217 : 17 : exited_quorum = utime_t();
1218 : 17 : finish_contexts(g_ceph_context, waitfor_quorum);
1219 : 17 : finish_contexts(g_ceph_context, maybe_wait_for_quorum);
1220 : 17 : resend_routed_requests();
1221 : 17 : update_logger();
1222 : 17 : register_cluster_logger();
1223 : :
1224 : : // am i named properly?
1225 : 17 : string cur_name = monmap->get_name(messenger->get_myaddr());
1226 [ - + ]: 17 : if (cur_name != name) {
1227 [ # # ][ # # ]: 0 : dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
1228 [ # # ]: 0 : messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
1229 [ # # ][ # # ]: 0 : monmap->get_inst(*quorum.begin()));
[ # # ][ # # ]
1230 : 17 : }
1231 : 17 : }
1232 : :
1233 : :
1234 : 5 : bool Monitor::_allowed_command(MonSession *s, const vector<string>& cmd)
1235 : : {
1236 [ - + ]: 5 : for (list<list<string> >::iterator p = s->caps.cmd_allow.begin();
1237 : 10 : p != s->caps.cmd_allow.end();
1238 : : ++p) {
1239 : : list<string>::iterator q;
1240 : : unsigned i;
1241 [ # # ][ # # ]: 0 : dout(0) << "cmd " << cmd << " vs " << *p << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
1242 [ # # ][ # # ]: 0 : for (q = p->begin(), i = 0; q != p->end() && i < cmd.size(); ++q, ++i) {
[ # # ]
1243 [ # # ]: 0 : if (*q == "*")
1244 : 0 : continue;
1245 [ # # ]: 0 : if (*q == "...") {
1246 : 0 : i = cmd.size() - 1;
1247 : 0 : continue;
1248 : : }
1249 [ # # ]: 0 : if (*q != cmd[i])
1250 : : break;
1251 : : }
1252 [ # # ][ # # ]: 0 : if (q == p->end() && i == cmd.size())
[ # # ]
1253 : : return true; // match
1254 : : }
1255 : :
1256 : : return false;
1257 : : }
1258 : :
1259 : 0 : void Monitor::_quorum_status(ostream& ss)
1260 : : {
1261 : 0 : JSONFormatter jf(true);
1262 [ # # ]: 0 : jf.open_object_section("quorum_status");
1263 [ # # ]: 0 : jf.dump_int("election_epoch", get_epoch());
1264 : :
1265 [ # # ]: 0 : jf.open_array_section("quorum");
1266 [ # # ]: 0 : for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
1267 [ # # ]: 0 : jf.dump_int("mon", *p);
1268 [ # # ]: 0 : jf.close_section();
1269 : :
1270 [ # # ]: 0 : jf.open_object_section("monmap");
1271 [ # # ]: 0 : monmap->dump(&jf);
1272 [ # # ]: 0 : jf.close_section();
1273 : :
1274 [ # # ]: 0 : jf.close_section();
1275 [ # # ]: 0 : jf.flush(ss);
1276 : 0 : }
1277 : :
1278 : 0 : void Monitor::_mon_status(ostream& ss)
1279 : : {
1280 : 0 : JSONFormatter jf(true);
1281 [ # # ]: 0 : jf.open_object_section("mon_status");
1282 [ # # ][ # # ]: 0 : jf.dump_string("name", name);
1283 [ # # ]: 0 : jf.dump_int("rank", rank);
1284 [ # # ][ # # ]: 0 : jf.dump_string("state", get_state_name());
1285 [ # # ]: 0 : jf.dump_int("election_epoch", get_epoch());
1286 : :
1287 [ # # ]: 0 : jf.open_array_section("quorum");
1288 [ # # ]: 0 : for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
1289 [ # # ]: 0 : jf.dump_int("mon", *p);
1290 [ # # ]: 0 : jf.close_section();
1291 : :
1292 [ # # ]: 0 : jf.open_array_section("outside_quorum");
1293 [ # # ]: 0 : for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
1294 [ # # ][ # # ]: 0 : jf.dump_string("mon", *p);
1295 [ # # ]: 0 : jf.close_section();
1296 : :
1297 [ # # ]: 0 : if (is_slurping()) {
1298 [ # # ][ # # ]: 0 : jf.dump_stream("slurp_source") << slurp_source;
1299 [ # # ]: 0 : jf.open_object_section("slurp_version");
1300 [ # # ]: 0 : for (map<string,version_t>::iterator p = slurp_versions.begin(); p != slurp_versions.end(); ++p)
1301 [ # # ]: 0 : jf.dump_int(p->first.c_str(), p->second);
1302 [ # # ]: 0 : jf.close_section();
1303 : : }
1304 : :
1305 [ # # ]: 0 : jf.open_object_section("monmap");
1306 [ # # ]: 0 : monmap->dump(&jf);
1307 [ # # ]: 0 : jf.close_section();
1308 : :
1309 [ # # ]: 0 : jf.close_section();
1310 : :
1311 [ # # ]: 0 : jf.flush(ss);
1312 : 0 : }
1313 : :
1314 : 1 : void Monitor::get_health(string& status, bufferlist *detailbl, Formatter *f)
1315 : : {
1316 : : list<pair<health_status_t,string> > summary;
1317 : : list<pair<health_status_t,string> > detail;
1318 : :
1319 [ - + ]: 1 : if (f)
1320 [ # # ]: 0 : f->open_object_section("health");
1321 : :
1322 [ + + ]: 7 : for (vector<PaxosService*>::iterator p = paxos_service.begin();
1323 : 14 : p != paxos_service.end();
1324 : : p++) {
1325 : 6 : PaxosService *s = *p;
1326 [ + - ][ + - ]: 6 : s->get_health(summary, detailbl ? &detail : NULL);
1327 : : }
1328 : :
1329 [ - + ]: 1 : if (f)
1330 [ # # ]: 0 : f->open_array_section("summary");
1331 [ + - ]: 1 : stringstream ss;
1332 : 1 : health_status_t overall = HEALTH_OK;
1333 [ - + ]: 1 : if (!summary.empty()) {
1334 [ # # ]: 0 : if (f) {
1335 [ # # ]: 0 : f->open_object_section("item");
1336 [ # # ][ # # ]: 0 : f->dump_stream("severity") << summary.front().first;
1337 [ # # ][ # # ]: 0 : f->dump_string("summary", summary.front().second);
1338 [ # # ]: 0 : f->close_section();
1339 : : }
1340 [ # # ]: 0 : ss << ' ';
1341 [ # # ]: 0 : while (!summary.empty()) {
1342 [ # # ]: 0 : if (overall > summary.front().first)
1343 : 0 : overall = summary.front().first;
1344 [ # # ]: 0 : ss << summary.front().second;
1345 : 0 : summary.pop_front();
1346 [ # # ]: 0 : if (!summary.empty())
1347 [ # # ]: 0 : ss << "; ";
1348 : : }
1349 : : }
1350 [ - + ]: 1 : if (f)
1351 [ # # ]: 0 : f->close_section();
1352 : :
1353 [ - + ]: 1 : if (f)
1354 [ # # ]: 0 : f->open_array_section("timechecks");
1355 [ + - ]: 1 : if (timecheck_skews.size() != 0) {
1356 : : list<string> warns;
1357 [ + + ]: 4 : for (map<entity_inst_t,double>::iterator i = timecheck_skews.begin();
1358 : 8 : i != timecheck_skews.end(); ++i) {
1359 : 3 : entity_inst_t inst = i->first;
1360 : 3 : double skew = i->second;
1361 [ + - ]: 3 : double latency = timecheck_latencies[inst];
1362 [ + - ]: 3 : string name = monmap->get_name(inst.addr);
1363 : :
1364 [ + - ]: 3 : ostringstream tcss;
1365 [ + - ]: 3 : health_status_t tcstatus = timecheck_status(tcss, skew, latency);
1366 [ - + ]: 3 : if (tcstatus != HEALTH_OK) {
1367 : 0 : overall = tcstatus;
1368 [ # # ]: 0 : warns.push_back(name);
1369 : :
1370 [ # # ]: 0 : ostringstream tmp_ss;
1371 [ # # ][ # # ]: 0 : tmp_ss << "mon." << name
1372 [ # # ][ # # ]: 0 : << " addr " << inst.addr << " " << tcss.str()
[ # # ][ # # ]
1373 [ # # ][ # # ]: 0 : << " (latency " << latency << "s)";
1374 [ # # ]: 0 : detail.push_back(make_pair(tcstatus, tmp_ss.str()));
1375 : : }
1376 : :
1377 [ - + ]: 3 : if (f) {
1378 [ # # ]: 0 : f->open_object_section(name.c_str());
1379 [ # # ][ # # ]: 0 : f->dump_string("name", name.c_str());
1380 [ # # ]: 0 : f->dump_float("skew", skew);
1381 [ # # ]: 0 : f->dump_float("latency", latency);
1382 [ # # ][ # # ]: 0 : f->dump_stream("health") << tcstatus;
1383 [ # # ]: 0 : if (tcstatus != HEALTH_OK)
1384 [ # # ][ # # ]: 0 : f->dump_stream("details") << tcss.str();
1385 [ # # ]: 0 : f->close_section();
1386 : : }
1387 [ + - ]: 3 : }
1388 [ - + ]: 1 : if (!warns.empty()) {
1389 [ # # ]: 0 : if (!ss.str().empty())
1390 [ # # ]: 0 : ss << ";";
1391 [ # # ]: 0 : ss << " clock skew detected on";
1392 [ # # ]: 0 : while (!warns.empty()) {
1393 [ # # ][ # # ]: 0 : ss << " mon." << warns.front();
1394 : : warns.pop_front();
1395 [ # # ]: 0 : if (!warns.empty())
1396 [ # # ]: 0 : ss << ",";
1397 : : }
1398 : : }
1399 : : }
1400 [ - + ]: 1 : if (f)
1401 [ # # ]: 0 : f->close_section();
1402 : :
1403 [ + - ]: 1 : stringstream fss;
1404 [ + - ]: 1 : fss << overall;
1405 : 1 : status = fss.str() + ss.str();
1406 [ - + ]: 1 : if (f)
1407 [ # # ][ # # ]: 0 : f->dump_stream("overall_status") << overall;
1408 : :
1409 [ - + ]: 1 : if (f)
1410 [ # # ]: 1 : f->open_array_section("detail");
1411 [ - + ]: 1 : while (!detail.empty()) {
1412 [ # # ]: 0 : if (f)
1413 [ # # ][ # # ]: 0 : f->dump_string("item", detail.front().second);
1414 [ # # ]: 0 : if (detailbl != NULL) {
1415 [ # # ]: 0 : detailbl->append(detail.front().second);
1416 [ # # ]: 0 : detailbl->append('\n');
1417 : : }
1418 : 0 : detail.pop_front();
1419 : : }
1420 [ - + ]: 1 : if (f)
1421 [ # # ]: 0 : f->close_section();
1422 : :
1423 [ - + ]: 1 : if (f)
1424 [ # # ][ + - ]: 1 : f->close_section();
[ + - ]
1425 : 1 : }
1426 : :
1427 : 0 : void Monitor::get_status(stringstream &ss, Formatter *f)
1428 : : {
1429 [ # # ]: 0 : if (f)
1430 : 0 : f->open_object_section("status");
1431 : :
1432 : : // reply with the status for all the components
1433 : : string health;
1434 [ # # ]: 0 : get_health(health, NULL, f);
1435 : :
1436 [ # # ]: 0 : if (f) {
1437 [ # # ]: 0 : f->dump_stream("monmap") << *monmap;
1438 [ # # ]: 0 : f->dump_stream("election_epoch") << get_epoch();
1439 [ # # ][ # # ]: 0 : f->dump_stream("quorum") << get_quorum();
1440 [ # # ][ # # ]: 0 : f->dump_stream("quorum_names") << get_quorum_names();
[ # # ]
1441 [ # # ]: 0 : f->dump_stream("osdmap") << osdmon()->osdmap;
1442 [ # # ]: 0 : f->dump_stream("pgmap") << pgmon()->pg_map;
1443 [ # # ]: 0 : f->dump_stream("mdsmap") << mdsmon()->mdsmap;
1444 [ # # ]: 0 : f->close_section();
1445 : : } else {
1446 [ # # ][ # # ]: 0 : ss << " health " << health << "\n";
[ # # ]
1447 [ # # ][ # # ]: 0 : ss << " monmap " << *monmap << ", election epoch " << get_epoch()
1448 [ # # ][ # # ]: 0 : << ", quorum " << get_quorum() << " " << get_quorum_names() << "\n";
[ # # ][ # # ]
[ # # ][ # # ]
1449 [ # # ][ # # ]: 0 : ss << " osdmap " << osdmon()->osdmap << "\n";
1450 [ # # ][ # # ]: 0 : ss << " pgmap " << pgmon()->pg_map << "\n";
1451 [ # # ][ # # ]: 0 : ss << " mdsmap " << mdsmon()->mdsmap << "\n";
1452 : 0 : }
1453 : 0 : }
1454 : :
1455 : 5 : void Monitor::handle_command(MMonCommand *m)
1456 : : {
1457 [ - + ]: 5 : if (m->fsid != monmap->fsid) {
1458 [ # # ][ # # ]: 0 : dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
1459 [ # # ]: 0 : reply_command(m, -EPERM, "wrong fsid", 0);
1460 : : return;
1461 : : }
1462 : :
1463 : 10 : MonSession *session = m->get_session();
1464 [ - + ]: 5 : if (!session) {
1465 : 0 : string rs = "Access denied";
1466 [ # # ]: 0 : reply_command(m, -EACCES, rs, 0);
1467 : 0 : return;
1468 : : }
1469 : :
1470 : 5 : bool access_cmd = _allowed_command(session, m->cmd);
1471 : 5 : bool access_r = (session->caps.check_privileges(PAXOS_MONMAP, MON_CAP_R) ||
1472 [ # # ][ - + ]: 5 : access_cmd);
1473 [ - + ][ # # ]: 5 : bool access_all = (session->caps.get_allow_all() || access_cmd);
1474 : :
1475 [ + - ][ + - ]: 10 : dout(0) << "handle_command " << *m << dendl;
[ + - ][ + - ]
[ + - ]
1476 : 5 : bufferlist rdata;
1477 : : string rs;
1478 : 5 : int r = -EINVAL;
1479 : : rs = "unrecognized command";
1480 [ + - ]: 5 : if (m->cmd.empty())
1481 : : goto out;
1482 : :
1483 [ + - ][ + + ]: 5 : if (m->cmd[0] == "mds") {
1484 [ + - ]: 1 : mdsmon()->dispatch(m);
1485 : : return;
1486 : : }
1487 [ + - ][ + + ]: 4 : if (m->cmd[0] == "osd") {
1488 [ + - ]: 3 : osdmon()->dispatch(m);
1489 : : return;
1490 : : }
1491 [ + - ][ - + ]: 1 : if (m->cmd[0] == "pg") {
1492 [ # # ]: 0 : pgmon()->dispatch(m);
1493 : : return;
1494 : : }
1495 [ + - ][ - + ]: 1 : if (m->cmd[0] == "mon") {
1496 [ # # ]: 0 : monmon()->dispatch(m);
1497 : : return;
1498 : : }
1499 [ + - ][ - + ]: 1 : if (m->cmd[0] == "class") {
1500 [ # # ][ # # ]: 0 : reply_command(m, -EINVAL, "class distribution is no longer handled by the monitor", 0);
1501 : : return;
1502 : : }
1503 [ + - ][ - + ]: 1 : if (m->cmd[0] == "auth") {
1504 [ # # ]: 0 : authmon()->dispatch(m);
1505 : : return;
1506 : : }
1507 : :
1508 [ + - ][ - + ]: 1 : if (m->cmd[0] == "fsid") {
1509 [ # # ]: 0 : stringstream ss;
1510 [ # # ]: 0 : ss << monmap->fsid;
1511 [ # # ]: 0 : reply_command(m, 0, ss.str(), rdata, 0);
1512 [ # # ]: 0 : return;
1513 : : }
1514 [ + - ][ - + ]: 1 : if (m->cmd[0] == "log") {
1515 [ # # ]: 0 : if (!access_r) {
1516 : 0 : r = -EACCES;
1517 : : rs = "access denied";
1518 : : goto out;
1519 : : }
1520 [ # # ]: 0 : stringstream ss;
1521 [ # # ]: 0 : for (unsigned i=1; i<m->cmd.size(); i++) {
1522 [ # # ]: 0 : if (i > 1)
1523 [ # # ]: 0 : ss << ' ';
1524 [ # # ]: 0 : ss << m->cmd[i];
1525 : : }
1526 : 0 : clog.info(ss);
1527 : : rs = "ok";
1528 [ # # ]: 0 : reply_command(m, 0, rs, rdata, 0);
1529 [ # # ]: 0 : return;
1530 : : }
1531 [ + - ][ - + ]: 1 : if (m->cmd[0] == "stop_cluster") {
1532 [ # # ]: 0 : if (!access_all) {
1533 : 0 : r = -EACCES;
1534 : : rs = "access denied";
1535 : : goto out;
1536 : : }
1537 [ # # ]: 0 : stop_cluster();
1538 [ # # ][ # # ]: 0 : reply_command(m, 0, "initiating cluster shutdown", 0);
1539 : : return;
1540 : : }
1541 : :
1542 [ + - ][ - + ]: 1 : if (m->cmd[0] == "injectargs") {
1543 [ # # ]: 0 : if (!access_all) {
1544 : 0 : r = -EACCES;
1545 : : rs = "access denied";
1546 : : goto out;
1547 : : }
1548 [ # # ]: 0 : if (m->cmd.size() == 2) {
1549 [ # # ][ # # ]: 0 : dout(0) << "parsing injected options '" << m->cmd[1] << "'" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
1550 [ # # ]: 0 : ostringstream oss;
1551 [ # # ]: 0 : g_conf->injectargs(m->cmd[1], &oss);
1552 [ # # ][ # # ]: 0 : derr << "injectargs:" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
1553 [ # # ][ # # ]: 0 : derr << oss.str() << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
1554 : : rs = "parsed options";
1555 [ # # ]: 0 : r = 0;
1556 : : } else {
1557 : : rs = "must supply options to be parsed in a single string";
1558 : : r = -EINVAL;
1559 : : }
1560 [ + - ][ + - ]: 1 : } else if ((m->cmd[0] == "status") || (m->cmd[0] == "health")) {
[ + - ][ + - ]
[ + - ]
1561 [ - + ]: 1 : if (!access_r) {
1562 : 0 : r = -EACCES;
1563 : : rs = "access denied";
1564 : : goto out;
1565 : : }
1566 : :
1567 : : vector<const char *> args;
1568 [ + + ]: 2 : for (unsigned int i = 0; i < m->cmd.size(); ++i)
1569 : 3 : args.push_back(m->cmd[i].c_str());
1570 : :
1571 [ + - ]: 1 : string format = "plain";
1572 : 1 : JSONFormatter *jf = NULL;
1573 [ + + ]: 2 : for (vector<const char*>::iterator i = args.begin(); i != args.end();) {
1574 : : string val;
1575 [ + - ][ - + ]: 1 : if (ceph_argparse_witharg(args, i, &val,
1576 : : "-f", "--format", (char*)NULL)) {
1577 : : format = val;
1578 : : } else {
1579 : : ++i;
1580 : : }
1581 : 1 : }
1582 : :
1583 [ - + ]: 1 : if (format != "plain") {
1584 [ # # ][ # # ]: 0 : if (format == "json") {
1585 [ # # ][ # # ]: 0 : jf = new JSONFormatter(true);
1586 : : } else {
1587 : 0 : r = -EINVAL;
1588 [ # # ]: 0 : stringstream err_ss;
1589 [ # # ][ # # ]: 0 : err_ss << "unrecognized format '" << format
1590 [ # # ]: 0 : << "' (available: plain, json)";
1591 : 0 : rs = err_ss.str();
1592 [ # # ]: 0 : goto out;
1593 : : }
1594 : : }
1595 : :
1596 [ + - ]: 1 : stringstream ss;
1597 [ + - ][ + - ]: 1 : if (string(args[0]) == "status") {
[ - + ]
1598 [ # # ]: 0 : get_status(ss, jf);
1599 : :
1600 [ # # ]: 0 : if (jf) {
1601 [ # # ]: 0 : jf->flush(ss);
1602 [ # # ]: 0 : ss << '\n';
1603 : : }
1604 [ + - ][ + - ]: 1 : } else if (string(args[0]) == "health") {
[ + - ]
1605 : : string health_str;
1606 [ + - ][ + - ]: 1 : get_health(health_str, (args.size() > 1) ? &rdata : NULL, jf);
1607 [ - + ]: 1 : if (jf) {
1608 [ # # ]: 0 : jf->flush(ss);
1609 [ # # ]: 0 : ss << '\n';
1610 : : } else {
1611 [ + - ]: 1 : ss << health_str;
1612 : 1 : }
1613 : : } else {
1614 : 0 : assert(0 == "We should never get here!");
1615 : : return;
1616 : : }
1617 : 1 : rs = ss.str();
1618 [ + - ]: 1 : r = 0;
1619 [ # # ][ # # ]: 0 : } else if (m->cmd[0] == "report") {
1620 [ # # ]: 0 : if (!access_r) {
1621 : 0 : r = -EACCES;
1622 : : rs = "access denied";
1623 : : goto out;
1624 : : }
1625 : :
1626 [ # # ]: 0 : JSONFormatter jf(true);
1627 : :
1628 [ # # ]: 0 : jf.open_object_section("report");
1629 [ # # ][ # # ]: 0 : jf.dump_string("version", ceph_version_to_str());
[ # # ]
1630 [ # # ][ # # ]: 0 : jf.dump_string("commit", git_version_to_str());
[ # # ]
1631 [ # # ][ # # ]: 0 : jf.dump_stream("timestamp") << ceph_clock_now(NULL);
1632 : :
1633 : : string d;
1634 [ # # ]: 0 : for (unsigned i = 1; i < m->cmd.size(); i++) {
1635 [ # # ]: 0 : if (i > 1)
1636 : : d += " ";
1637 : 0 : d += m->cmd[i];
1638 : : }
1639 [ # # ][ # # ]: 0 : jf.dump_string("tag", d);
1640 : :
1641 : : string hs;
1642 [ # # ]: 0 : get_health(hs, NULL, &jf);
1643 : :
1644 [ # # ]: 0 : monmon()->dump_info(&jf);
1645 [ # # ]: 0 : osdmon()->dump_info(&jf);
1646 [ # # ]: 0 : mdsmon()->dump_info(&jf);
1647 [ # # ]: 0 : pgmon()->dump_info(&jf);
1648 : :
1649 [ # # ]: 0 : jf.close_section();
1650 [ # # ]: 0 : stringstream ss;
1651 [ # # ]: 0 : jf.flush(ss);
1652 : :
1653 [ # # ]: 0 : bufferlist bl;
1654 [ # # ][ # # ]: 0 : bl.append("-------- BEGIN REPORT --------\n");
1655 [ # # ]: 0 : bl.append(ss);
1656 [ # # ]: 0 : ostringstream ss2;
1657 [ # # ][ # # ]: 0 : ss2 << "\n-------- END REPORT " << bl.crc32c(6789) << " --------\n";
1658 [ # # ]: 0 : rdata.append(bl);
1659 [ # # ]: 0 : rdata.append(ss2.str());
1660 : 0 : rs = string();
1661 [ # # ][ # # ]: 0 : r = 0;
[ # # ][ # # ]
1662 [ # # ][ # # ]: 0 : } else if (m->cmd[0] == "quorum_status") {
1663 [ # # ]: 0 : if (!access_r) {
1664 : 0 : r = -EACCES;
1665 : : rs = "access denied";
1666 : : goto out;
1667 : : }
1668 : : // make sure our map is readable and up to date
1669 [ # # ][ # # ]: 0 : if (!is_leader() && !is_peon()) {
[ # # ]
1670 [ # # ][ # # ]: 0 : dout(10) << " waiting for quorum" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
1671 [ # # ][ # # ]: 0 : waitfor_quorum.push_back(new C_RetryMessage(this, m));
1672 : : return;
1673 : : }
1674 [ # # ]: 0 : stringstream ss;
1675 [ # # ]: 0 : _quorum_status(ss);
1676 : 0 : rs = ss.str();
1677 [ # # ]: 0 : r = 0;
1678 [ # # ][ # # ]: 0 : } else if (m->cmd[0] == "mon_status") {
1679 [ # # ]: 0 : if (!access_r) {
1680 : 0 : r = -EACCES;
1681 : : rs = "access denied";
1682 : : goto out;
1683 : : }
1684 [ # # ]: 0 : stringstream ss;
1685 [ # # ]: 0 : _mon_status(ss);
1686 : 0 : rs = ss.str();
1687 [ # # ]: 0 : r = 0;
1688 [ # # ][ # # ]: 0 : } else if (m->cmd[0] == "heap") {
1689 [ # # ]: 0 : if (!access_all) {
1690 : 0 : r = -EACCES;
1691 : : rs = "access denied";
1692 : : goto out;
1693 : : }
1694 [ # # ][ # # ]: 0 : if (!ceph_using_tcmalloc())
1695 : : rs = "tcmalloc not enabled, can't use heap profiler commands\n";
1696 : : else {
1697 [ # # ]: 0 : ostringstream ss;
1698 [ # # ]: 0 : ceph_heap_profiler_handle_command(m->cmd, ss);
1699 [ # # ]: 0 : rs = ss.str();
1700 : : }
1701 [ # # ][ # # ]: 0 : } else if (m->cmd[0] == "quorum") {
1702 [ # # ]: 0 : if (!access_all) {
1703 : 0 : r = -EACCES;
1704 : : rs = "access denied";
1705 : : goto out;
1706 : : }
1707 [ # # ][ # # ]: 0 : if (m->cmd[1] == "exit") {
1708 [ # # ]: 0 : reset();
1709 [ # # ]: 0 : start_election();
1710 : : elector.stop_participating();
1711 : : rs = "stopped responding to quorum, initiated new election";
1712 : : r = 0;
1713 [ # # ][ # # ]: 0 : } else if (m->cmd[1] == "enter") {
1714 [ # # ]: 0 : elector.start_participating();
1715 [ # # ]: 0 : reset();
1716 [ # # ]: 0 : start_election();
1717 : : rs = "started responding to quorum, initiated new election";
1718 : : r = 0;
1719 : : } else {
1720 : : rs = "unknown quorum subcommand; use exit or enter";
1721 : : r = -EINVAL;
1722 : : }
1723 : : }
1724 : :
1725 : : out:
1726 [ + - ]: 1 : if (!m->get_source().is_mon()) // don't reply to mon->mon commands
1727 [ + - ]: 1 : reply_command(m, r, rs, rdata, 0);
1728 : : else
1729 : 5 : m->put();
1730 : : }
1731 : :
1732 : 0 : void Monitor::reply_command(MMonCommand *m, int rc, const string &rs, version_t version)
1733 : : {
1734 : 0 : bufferlist rdata;
1735 [ # # ]: 0 : reply_command(m, rc, rs, rdata, version);
1736 : 0 : }
1737 : :
1738 : 5 : void Monitor::reply_command(MMonCommand *m, int rc, const string &rs, bufferlist& rdata, version_t version)
1739 : : {
1740 [ + - ]: 10 : MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
1741 : 5 : reply->set_data(rdata);
1742 : 5 : send_reply(m, reply);
1743 : 5 : m->put();
1744 : 5 : }
1745 : :
1746 : :
1747 : : // ------------------------
1748 : : // request/reply routing
1749 : : //
1750 : : // a client/mds/osd will connect to a random monitor. we need to forward any
1751 : : // messages requiring state updates to the leader, and then route any replies
1752 : : // back via the correct monitor and back to them. (the monitor will not
1753 : : // initiate any connections.)
1754 : :
1755 : 13 : void Monitor::forward_request_leader(PaxosServiceMessage *req)
1756 : : {
1757 : 13 : int mon = get_leader();
1758 : 13 : MonSession *session = 0;
1759 [ + - ]: 13 : if (req->get_connection())
1760 : 13 : session = (MonSession *)req->get_connection()->get_priv();
1761 [ - + ]: 13 : if (req->session_mon >= 0) {
1762 [ # # ][ # # ]: 0 : dout(10) << "forward_request won't double fwd request " << *req << dendl;
[ # # ][ # # ]
[ # # ]
1763 : 0 : req->put();
1764 [ + - ][ + - ]: 13 : } else if (session && !session->closed) {
1765 : 13 : RoutedRequest *rr = new RoutedRequest;
1766 : 13 : rr->tid = ++routed_request_tid;
1767 : 13 : rr->client = req->get_source_inst();
1768 : 13 : encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
1769 : 26 : rr->session = (MonSession *)session->get();
1770 : 13 : routed_requests[rr->tid] = rr;
1771 : 13 : session->routed_request_tids.insert(rr->tid);
1772 : :
1773 [ # # ][ # # ]: 13 : dout(10) << "forward_request " << rr->tid << " request " << *req << dendl;
[ # # ][ # # ]
[ # # ][ - + ]
1774 : :
1775 [ + - ]: 13 : MForward *forward = new MForward(rr->tid, req, rr->session->caps);
1776 : 26 : forward->set_priority(req->get_priority());
1777 : 13 : messenger->send_message(forward, monmap->get_inst(mon));
1778 : : } else {
1779 [ # # ][ # # ]: 0 : dout(10) << "forward_request no session for request " << *req << dendl;
[ # # ][ # # ]
[ # # ]
1780 : 0 : req->put();
1781 : : }
1782 [ + - ]: 13 : if (session)
1783 : 13 : session->put();
1784 : 13 : }
1785 : :
1786 : : //extract the original message and put it into the regular dispatch function
1787 : 22 : void Monitor::handle_forward(MForward *m)
1788 : : {
1789 [ - + ][ # # ]: 22 : dout(10) << "received forwarded message from " << m->client
[ # # ][ # # ]
1790 [ # # ][ # # ]: 0 : << " via " << m->get_source_inst() << dendl;
[ # # ]
1791 : 22 : MonSession *session = (MonSession *)m->get_connection()->get_priv();
1792 [ - + ]: 22 : assert(session);
1793 : :
1794 [ - + ]: 22 : if (!session->caps.check_privileges(PAXOS_MONMAP, MON_CAP_X)) {
1795 [ # # ][ # # ]: 0 : dout(0) << "forward from entity with insufficient caps! "
[ # # ]
1796 [ # # ]: 0 : << session->caps << dendl;
1797 : : } else {
1798 : 22 : Connection *c = new Connection;
1799 [ + - ]: 22 : MonSession *s = new MonSession(m->msg->get_source_inst(), c);
1800 : 22 : c->set_priv(s);
1801 : 22 : c->set_peer_addr(m->client.addr);
1802 : : c->set_peer_type(m->client.name.type());
1803 : :
1804 : 22 : s->caps = m->client_caps;
1805 : 44 : s->proxy_con = m->get_connection()->get();
1806 : 22 : s->proxy_tid = m->tid;
1807 : :
1808 : 22 : PaxosServiceMessage *req = m->msg;
1809 : 22 : m->msg = NULL; // so ~MForward doesn't delete it
1810 : : req->set_connection(c);
1811 : : /* Because this is a special fake connection, we need to break
1812 : : the ref loop between Connection and MonSession differently
1813 : : than we normally do. Here, the Message refers to the Connection
1814 : : which refers to the Session, and nobody else refers to the Connection
1815 : : or the Session. And due to the special nature of this message,
1816 : : nobody refers to the Connection via the Session. So, clear out that
1817 : : half of the ref loop.*/
1818 : 22 : s->con->put();
1819 : 22 : s->con = NULL;
1820 : :
1821 [ - + ][ # # ]: 22 : dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
1822 : :
1823 : 22 : _ms_dispatch(req);
1824 : : }
1825 : 22 : session->put();
1826 : 22 : m->put();
1827 : 22 : }
1828 : :
1829 : 0 : void Monitor::try_send_message(Message *m, const entity_inst_t& to)
1830 : : {
1831 [ # # ][ # # ]: 0 : dout(10) << "try_send_message " << *m << " to " << to << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
1832 : :
1833 : 0 : bufferlist bl;
1834 [ # # ]: 0 : encode_message(m, CEPH_FEATURES_ALL, bl); // fixme: assume peers have all features we do.
1835 : :
1836 [ # # ]: 0 : messenger->send_message(m, to);
1837 : :
1838 [ # # ]: 0 : for (int i=0; i<(int)monmap->size(); i++) {
1839 [ # # ]: 0 : if (i != rank)
1840 [ # # ][ # # ]: 0 : messenger->send_message(new MRoute(bl, to), monmap->get_inst(i));
[ # # ][ # # ]
[ # # ]
1841 : 0 : }
1842 : 0 : }
1843 : :
1844 : 151 : void Monitor::send_reply(PaxosServiceMessage *req, Message *reply)
1845 : : {
1846 : 151 : MonSession *session = (MonSession*)req->get_connection()->get_priv();
1847 [ - + ]: 151 : if (!session) {
1848 [ # # ][ # # ]: 0 : dout(2) << "send_reply no session, dropping reply " << *reply
[ # # ][ # # ]
1849 [ # # ][ # # ]: 0 : << " to " << req << " " << *req << dendl;
[ # # ][ # # ]
1850 : 0 : reply->put();
1851 : 151 : return;
1852 : : }
1853 [ + + ]: 151 : if (session->proxy_con) {
1854 [ - + ][ # # ]: 11 : dout(15) << "send_reply routing reply to " << req->get_connection()->get_peer_addr()
[ # # ][ # # ]
1855 [ # # ][ # # ]: 0 : << " via mon" << req->session_mon
1856 [ # # ][ # # ]: 0 : << " for request " << *req << dendl;
[ # # ]
1857 : 0 : messenger->send_message(new MRoute(session->proxy_tid, reply),
1858 : 22 : session->proxy_con);
1859 : : } else {
1860 : 140 : messenger->send_message(reply, session->con);
1861 : : }
1862 : 151 : session->put();
1863 : : }
1864 : :
1865 : 0 : void Monitor::no_reply(PaxosServiceMessage *req)
1866 : : {
1867 : 0 : MonSession *session = (MonSession*)req->get_connection()->get_priv();
1868 [ # # ]: 0 : if (!session) {
1869 [ # # ][ # # ]: 0 : dout(2) << "no_reply no session, dropping non-reply to " << req << " " << *req << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
1870 : 0 : return;
1871 : : }
1872 [ # # ]: 0 : if (session->proxy_con) {
1873 [ # # ]: 0 : if (get_quorum_features() & CEPH_FEATURE_MON_NULLROUTE) {
1874 [ # # ][ # # ]: 0 : dout(10) << "no_reply to " << req->get_source_inst() << " via mon." << req->session_mon
[ # # ][ # # ]
[ # # ][ # # ]
1875 [ # # ][ # # ]: 0 : << " for request " << *req << dendl;
[ # # ]
1876 : 0 : messenger->send_message(new MRoute(session->proxy_tid, NULL),
1877 : 0 : session->proxy_con);
1878 : : } else {
1879 [ # # ][ # # ]: 0 : dout(10) << "no_reply no quorum nullroute feature for " << req->get_source_inst() << " via mon." << req->session_mon
[ # # ][ # # ]
[ # # ][ # # ]
1880 [ # # ][ # # ]: 0 : << " for request " << *req << dendl;
[ # # ]
1881 : : }
1882 : : } else {
1883 [ # # ][ # # ]: 0 : dout(10) << "no_reply to " << req->get_source_inst() << " " << *req << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
1884 : : }
1885 : 0 : session->put();
1886 : : }
1887 : :
1888 : 11 : void Monitor::handle_route(MRoute *m)
1889 : : {
1890 : 11 : MonSession *session = (MonSession *)m->get_connection()->get_priv();
1891 : : //check privileges
1892 [ + - ][ - + ]: 11 : if (session && !session->caps.check_privileges(PAXOS_MONMAP, MON_CAP_X)) {
[ + - ]
1893 [ # # ][ # # ]: 0 : dout(0) << "MRoute received from entity without appropriate perms! "
[ # # ]
1894 [ # # ]: 0 : << dendl;
1895 : 0 : session->put();
1896 : 0 : m->put();
1897 : 11 : return;
1898 : : }
1899 [ + - ]: 11 : if (m->msg)
1900 [ - + ][ # # ]: 11 : dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
1901 : : else
1902 [ # # ][ # # ]: 0 : dout(10) << "handle_route null to " << m->dest << dendl;
[ # # ][ # # ]
[ # # ]
1903 : :
1904 : : // look it up
1905 [ + - ]: 11 : if (m->session_mon_tid) {
1906 [ + - ]: 11 : if (routed_requests.count(m->session_mon_tid)) {
1907 : 11 : RoutedRequest *rr = routed_requests[m->session_mon_tid];
1908 : :
1909 : : // reset payload, in case encoding is dependent on target features
1910 [ + - ]: 11 : if (m->msg) {
1911 : 11 : m->msg->clear_payload();
1912 : 11 : messenger->send_message(m->msg, rr->session->inst);
1913 : 11 : m->msg = NULL;
1914 : : }
1915 : 11 : routed_requests.erase(m->session_mon_tid);
1916 : 11 : rr->session->routed_request_tids.insert(rr->tid);
1917 [ + - ]: 11 : delete rr;
1918 : : } else {
1919 [ # # ][ # # ]: 0 : dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
[ # # ][ # # ]
1920 : : }
1921 : : } else {
1922 [ # # ][ # # ]: 0 : dout(10) << " not a routed request, trying to send anyway" << dendl;
[ # # ][ # # ]
1923 [ # # ]: 0 : if (m->msg) {
1924 : 0 : messenger->lazy_send_message(m->msg, m->dest);
1925 : 0 : m->msg = NULL;
1926 : : }
1927 : : }
1928 : 11 : m->put();
1929 [ + - ]: 11 : if (session)
1930 : 11 : session->put();
1931 : : }
1932 : :
1933 : 17 : void Monitor::resend_routed_requests()
1934 : : {
1935 [ - + ][ # # ]: 17 : dout(10) << "resend_routed_requests" << dendl;
[ # # ][ # # ]
1936 : 17 : int mon = get_leader();
1937 [ + + ]: 26 : for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
1938 : 52 : p != routed_requests.end();
1939 : : p++) {
1940 : 9 : RoutedRequest *rr = p->second;
1941 : :
1942 : 9 : bufferlist::iterator q = rr->request_bl.begin();
1943 : 9 : PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, q);
1944 : :
1945 [ # # ][ # # ]: 9 : dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ - + ]
1946 [ + - ]: 9 : MForward *forward = new MForward(rr->tid, req, rr->session->caps);
1947 : 9 : forward->client = rr->client;
1948 : 18 : forward->set_priority(req->get_priority());
1949 : 9 : messenger->send_message(forward, monmap->get_inst(mon));
1950 : : }
1951 : 17 : }
1952 : :
1953 : 40 : void Monitor::remove_session(MonSession *s)
1954 : : {
1955 [ - + ][ # # ]: 40 : dout(10) << "remove_session " << s << " " << s->inst << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
1956 [ - + ]: 40 : assert(!s->closed);
1957 [ + + ]: 53 : for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
1958 : 106 : p != s->routed_request_tids.end();
1959 : : p++) {
1960 [ + + ]: 13 : if (routed_requests.count(*p)) {
1961 : 2 : RoutedRequest *rr = routed_requests[*p];
1962 [ # # ][ # # ]: 2 : dout(10) << " dropping routed request " << rr->tid << dendl;
[ # # ][ - + ]
1963 [ + - ]: 2 : delete rr;
1964 : 2 : routed_requests.erase(*p);
1965 : : }
1966 : : }
1967 : 40 : s->con->set_priv(NULL);
1968 : 40 : session_map.remove_session(s);
1969 : 40 : }
1970 : :
1971 : 3 : void Monitor::remove_all_sessions()
1972 : : {
1973 [ + + ]: 23 : while (!session_map.sessions.empty()) {
1974 : 17 : MonSession *s = session_map.sessions.front();
1975 : 17 : remove_session(s);
1976 : : }
1977 : 3 : }
1978 : :
1979 : 0 : void Monitor::send_command(const entity_inst_t& inst,
1980 : : const vector<string>& com, version_t version)
1981 : : {
1982 [ # # ][ # # ]: 0 : dout(10) << "send_command " << inst << "" << com << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
1983 : 0 : MMonCommand *c = new MMonCommand(monmap->fsid, version);
1984 : 0 : c->cmd = com;
1985 : 0 : try_send_message(c, inst);
1986 : 0 : }
1987 : :
1988 : :
1989 : 0 : void Monitor::stop_cluster()
1990 : : {
1991 [ # # ][ # # ]: 0 : dout(0) << "stop_cluster -- initiating shutdown" << dendl;
[ # # ][ # # ]
1992 : 0 : mdsmon()->do_stop();
1993 : 0 : }
1994 : :
1995 : :
1996 : 1444 : bool Monitor::_ms_dispatch(Message *m)
1997 : : {
1998 : 1444 : bool ret = true;
1999 : :
2000 [ - + ]: 1444 : if (state == STATE_SHUTDOWN) {
2001 : 0 : m->put();
2002 : : return true;
2003 : : }
2004 : :
2005 : 1444 : Connection *connection = m->get_connection();
2006 : 1444 : MonSession *s = NULL;
2007 : 1444 : bool reuse_caps = false;
2008 : 1444 : MonCaps caps;
2009 [ + - ]: 1444 : EntityName entity_name;
2010 : : bool src_is_mon;
2011 : :
2012 [ + - ][ + + ]: 1444 : src_is_mon = !connection || (connection->get_peer_type() & CEPH_ENTITY_TYPE_MON);
2013 : :
2014 [ + - ]: 1444 : if (connection) {
2015 [ + - ][ - + ]: 1444 : dout(20) << "have connection" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
2016 [ + - ]: 1444 : s = (MonSession *)connection->get_priv();
2017 [ + + ][ - + ]: 1444 : if (s && s->closed) {
2018 [ # # ]: 0 : caps = s->caps;
2019 : 0 : reuse_caps = true;
2020 : 0 : s->put();
2021 : : s = NULL;
2022 : : }
2023 [ + + ]: 1444 : if (!s) {
2024 [ + + ][ + + ]: 75 : if (!exited_quorum.is_zero()
[ + + ]
2025 : : && !src_is_mon) {
2026 : : /**
2027 : : * Wait list the new session until we're in the quorum, assuming it's
2028 : : * sufficiently new.
2029 : : * tick() will periodically send them back through so we can send
2030 : : * the client elsewhere if we don't think we're getting back in.
2031 : : *
2032 : : * But we whitelist a few sorts of messages:
2033 : : * 1) Monitors can talk to us at any time, of course.
2034 : : * 2) auth messages. It's unlikely to go through much faster, but
2035 : : * it's possible we've just lost our quorum status and we want to take...
2036 : : * 3) command messages. We want to accept these under all possible
2037 : : * circumstances.
2038 : : */
2039 [ + - ]: 35 : utime_t too_old = ceph_clock_now(g_ceph_context);
2040 : 35 : too_old -= g_ceph_context->_conf->mon_lease;
2041 [ + + ][ + + ]: 67 : if (m->get_recv_stamp() > too_old
[ + + ]
2042 : : && connection->is_connected()) {
2043 [ + - ][ + - ]: 29 : dout(5) << "waitlisting message " << *m
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ]
2044 [ + - ][ + - ]: 58 : << " until we get in quorum" << dendl;
[ + - ]
2045 [ + - ][ + - ]: 29 : maybe_wait_for_quorum.push_back(new C_RetryMessage(this, m));
2046 : : } else {
2047 [ + - ][ + - ]: 6 : dout(1) << "discarding message " << *m
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ]
2048 [ + - ]: 6 : << " and sending client elsewhere; we are not in quorum"
2049 [ + - ][ + - ]: 6 : << dendl;
2050 [ + - ]: 6 : messenger->mark_down(connection);
2051 : 6 : m->put();
2052 : : }
2053 : : return true;
2054 : : }
2055 [ + - ][ - + ]: 40 : dout(10) << "do not have session, making new one" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
2056 : 40 : s = session_map.new_session(m->get_source_inst(), m->get_connection());
2057 [ + - ]: 40 : m->get_connection()->set_priv(s->get());
2058 [ + - ][ - + ]: 40 : dout(10) << "ms_dispatch new session " << s << " for " << s->inst << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
2059 : :
2060 [ + + ]: 40 : if (m->get_connection()->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
2061 [ + - ][ - + ]: 31 : dout(10) << "setting timeout on session" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
2062 : : // set an initial timeout here, so we will trim this session even if they don't
2063 : : // do anything.
2064 [ + - ]: 31 : s->until = ceph_clock_now(g_ceph_context);
2065 : 31 : s->until += g_conf->mon_subscribe_interval;
2066 : : } else {
2067 : : //give it monitor caps; the peer type has been authenticated
2068 : 9 : reuse_caps = false;
2069 [ + - ][ + - ]: 18 : dout(5) << "setting monitor caps on this connection" << dendl;
[ + - ][ + - ]
[ + - ][ + - ]
[ + - ][ + - ]
2070 [ + - ]: 9 : if (!s->caps.allow_all) //but no need to repeatedly copy
2071 [ + - ]: 9 : s->caps = *mon_caps;
2072 : : }
2073 [ - + ]: 40 : if (reuse_caps)
2074 [ # # ]: 0 : s->caps = caps;
2075 : : } else {
2076 [ + - ][ - + ]: 1369 : dout(20) << "ms_dispatch existing session " << s << " for " << s->inst << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
2077 : : }
2078 [ + + ]: 1409 : if (s->auth_handler) {
2079 : 112 : entity_name = s->auth_handler->get_entity_name();
2080 : : }
2081 : : }
2082 : :
2083 [ + - ]: 1409 : if (s)
2084 [ + - ][ - + ]: 1409 : dout(20) << " caps " << s->caps.get_str() << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
2085 : :
2086 : : {
2087 [ + - + + : 1409 : switch (m->get_type()) {
+ + + - +
+ + - + +
- + + + +
- ]
2088 : :
2089 : : case MSG_ROUTE:
2090 [ + - ]: 11 : handle_route((MRoute*)m);
2091 : : break;
2092 : :
2093 : : // misc
2094 : : case CEPH_MSG_MON_GET_MAP:
2095 [ # # ]: 0 : handle_mon_get_map((MMonGetMap*)m);
2096 : : break;
2097 : :
2098 : : case CEPH_MSG_MON_GET_VERSION:
2099 [ + - ]: 2 : handle_get_version((MMonGetVersion*)m);
2100 : : break;
2101 : :
2102 : : case MSG_MON_COMMAND:
2103 [ + - ]: 5 : handle_command((MMonCommand*)m);
2104 : : break;
2105 : :
2106 : : case CEPH_MSG_MON_SUBSCRIBE:
2107 : : /* FIXME: check what's being subscribed, filter accordingly */
2108 [ + - ]: 19 : handle_subscribe((MMonSubscribe*)m);
2109 : : break;
2110 : :
2111 : : case MSG_MON_PROBE:
2112 [ + - ]: 60 : handle_probe((MMonProbe*)m);
2113 : : break;
2114 : :
2115 : : // OSDs
2116 : : case MSG_OSD_FAILURE:
2117 : : case MSG_OSD_BOOT:
2118 : : case MSG_OSD_ALIVE:
2119 : : case MSG_OSD_PGTEMP:
2120 [ + - ]: 13 : paxos_service[PAXOS_OSDMAP]->dispatch((PaxosServiceMessage*)m);
2121 : : break;
2122 : :
2123 : : case MSG_REMOVE_SNAPS:
2124 [ # # ]: 0 : paxos_service[PAXOS_OSDMAP]->dispatch((PaxosServiceMessage*)m);
2125 : : break;
2126 : :
2127 : : // MDSs
2128 : : case MSG_MDS_BEACON:
2129 : : case MSG_MDS_OFFLOAD_TARGETS:
2130 [ + - ]: 22 : paxos_service[PAXOS_MDSMAP]->dispatch((PaxosServiceMessage*)m);
2131 : : break;
2132 : :
2133 : : // auth
2134 : : case MSG_MON_GLOBAL_ID:
2135 : : case CEPH_MSG_AUTH:
2136 : : /* no need to check caps here */
2137 [ + - ]: 55 : paxos_service[PAXOS_AUTH]->dispatch((PaxosServiceMessage*)m);
2138 : : break;
2139 : :
2140 : : // pg
2141 : : case CEPH_MSG_STATFS:
2142 : : case MSG_PGSTATS:
2143 : : case MSG_GETPOOLSTATS:
2144 [ + - ]: 25 : paxos_service[PAXOS_PGMAP]->dispatch((PaxosServiceMessage*)m);
2145 : : break;
2146 : :
2147 : : case CEPH_MSG_POOLOP:
2148 [ # # ]: 0 : paxos_service[PAXOS_OSDMAP]->dispatch((PaxosServiceMessage*)m);
2149 : : break;
2150 : :
2151 : : // log
2152 : : case MSG_LOG:
2153 [ + - ]: 92 : paxos_service[PAXOS_LOG]->dispatch((PaxosServiceMessage*)m);
2154 : : break;
2155 : :
2156 : : case MSG_LOGACK:
2157 [ + - ]: 55 : clog.handle_log_ack((MLogAck*)m);
2158 : : break;
2159 : :
2160 : : // monmap
2161 : : case MSG_MON_JOIN:
2162 [ # # ]: 0 : paxos_service[PAXOS_MONMAP]->dispatch((PaxosServiceMessage*)m);
2163 : : break;
2164 : :
2165 : : // paxos
2166 : : case MSG_MON_PAXOS:
2167 : : {
2168 : 931 : MMonPaxos *pm = (MMonPaxos*)m;
2169 [ - + ][ # # ]: 931 : if (!src_is_mon &&
[ - + ]
2170 [ # # ]: 0 : !s->caps.check_privileges(PAXOS_MONMAP, MON_CAP_X)) {
2171 : : //can't send these!
2172 : 0 : pm->put();
2173 : : break;
2174 : : }
2175 : :
2176 : : // sanitize
2177 [ - + ]: 931 : if (pm->epoch > get_epoch()) {
2178 [ # # ]: 0 : bootstrap();
2179 : 0 : pm->put();
2180 : : break;
2181 : : }
2182 [ + + ]: 931 : if (pm->epoch != get_epoch()) {
2183 : 32 : pm->put();
2184 : : break;
2185 : : }
2186 : :
2187 : : // send it to the right paxos instance
2188 [ - + ]: 899 : assert(pm->machine_id < PAXOS_NUM);
2189 [ + - ][ + - ]: 899 : Paxos *p = get_paxos_by_name(get_paxos_name(pm->machine_id));
[ + - ]
2190 [ + - ]: 899 : p->dispatch((PaxosServiceMessage*)m);
2191 : :
2192 : : // make sure service finds out about any state changes
2193 [ + + ]: 899 : if (p->is_active())
2194 [ + - ]: 434 : paxos_service[p->machine_id]->update_from_paxos();
2195 : : }
2196 : : break;
2197 : :
2198 : : // elector messages
2199 : : case MSG_MON_ELECTION:
2200 : : //check privileges here for simplicity
2201 [ + - ][ + - ]: 132 : if (s &&
[ - + ]
2202 [ + - ]: 66 : !s->caps.check_privileges(PAXOS_MONMAP, MON_CAP_X)) {
2203 [ # # ][ # # ]: 0 : dout(0) << "MMonElection received from entity without enough caps!"
[ # # ][ # # ]
[ # # ][ # # ]
2204 [ # # ][ # # ]: 0 : << s->caps << dendl;
2205 : 0 : m->put();
2206 : : break;
2207 : : }
2208 [ + + ][ - + ]: 66 : if (!is_probing() && !is_slurping()) {
[ + + ]
2209 [ + - ]: 63 : elector.dispatch(m);
2210 : : } else {
2211 : 3 : m->put();
2212 : : }
2213 : : break;
2214 : :
2215 : : case MSG_FORWARD:
2216 [ + - ]: 22 : handle_forward((MForward *)m);
2217 : : break;
2218 : :
2219 : : case MSG_TIMECHECK:
2220 [ + - ]: 31 : handle_timecheck((MTimeCheck *)m);
2221 : : break;
2222 : :
2223 : : default:
2224 : : ret = false;
2225 : : }
2226 : : }
2227 [ + - ]: 1409 : if (s) {
2228 : 1409 : s->put();
2229 : : }
2230 : :
2231 : 1444 : return ret;
2232 : : }
2233 : :
2234 : 21 : void Monitor::timecheck_cleanup()
2235 : : {
2236 : 21 : timecheck_round = 0;
2237 : :
2238 [ + + ]: 21 : if (timecheck_event) {
2239 : 5 : timer.cancel_event(timecheck_event);
2240 : 5 : timecheck_event = NULL;
2241 : : }
2242 : :
2243 [ + + ]: 21 : if (timecheck_waiting.size() > 0)
2244 : 1 : timecheck_waiting.clear();
2245 : 21 : timecheck_skews.clear();
2246 : 21 : timecheck_latencies.clear();
2247 : 21 : }
2248 : :
2249 : 5 : void Monitor::timecheck_report()
2250 : : {
2251 [ - + ][ # # ]: 5 : dout(10) << __func__ << dendl;
[ # # ][ # # ]
2252 [ - + ]: 5 : assert(is_leader());
2253 [ - + ]: 5 : if (monmap->size() == 1) {
2254 : 0 : assert(0 == "We are alone; we shouldn't have gotten here!");
2255 : : return;
2256 : : }
2257 : :
2258 [ - + ]: 5 : assert(timecheck_latencies.size() == timecheck_skews.size());
2259 [ + + ]: 20 : for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
2260 [ + + ]: 15 : if (monmap->get_name(*q) == name)
2261 : 5 : continue;
2262 : :
2263 [ + - ]: 10 : MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT);
2264 : 10 : m->epoch = get_epoch();
2265 : 10 : m->round = timecheck_round;
2266 : :
2267 [ + + ]: 40 : for (map<entity_inst_t, double>::iterator it = timecheck_skews.begin(); it != timecheck_skews.end(); ++it) {
2268 : 30 : double skew = it->second;
2269 : 30 : double latency = timecheck_latencies[it->first];
2270 : :
2271 : 30 : m->skews[it->first] = skew;
2272 : 30 : m->latencies[it->first] = latency;
2273 : :
2274 [ # # ][ # # ]: 30 : dout(10) << __func__ << " " << it->first
[ # # ][ # # ]
[ - + ]
2275 [ # # ]: 0 : << " latency " << latency
2276 [ # # ][ # # ]: 0 : << " skew " << skew << dendl;
2277 : : }
2278 : 10 : entity_inst_t inst = monmap->get_inst(*q);
2279 [ # # ][ # # ]: 10 : dout(10) << __func__ << " send report to " << inst << dendl;
[ # # ][ # # ]
[ # # ][ - + ]
2280 : 10 : messenger->send_message(m, inst);
2281 : : }
2282 : 5 : }
2283 : :
2284 : 6 : void Monitor::timecheck()
2285 : : {
2286 [ - + ][ # # ]: 6 : dout(10) << __func__ << dendl;
[ # # ][ # # ]
2287 [ - + ]: 6 : assert(is_leader());
2288 : :
2289 [ - + ]: 6 : if (monmap->size() == 1) {
2290 : 0 : assert(0 == "We are alone; this shouldn't have been scheduled!");
2291 : : return;
2292 : : }
2293 : :
2294 : 6 : timecheck_epoch = get_epoch();
2295 : 6 : timecheck_round++;
2296 : :
2297 [ - + ][ # # ]: 6 : dout(10) << __func__ << " start timecheck epoch " << timecheck_epoch
[ # # ][ # # ]
2298 [ # # ][ # # ]: 0 : << " round " << timecheck_round << dendl;
2299 : :
2300 : : // we are at the eye of the storm; the point of reference
2301 : 6 : timecheck_skews[monmap->get_inst(name)] = 0.0;
2302 : 6 : timecheck_latencies[monmap->get_inst(name)] = 0.0;
2303 : :
2304 [ + + ]: 23 : for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
2305 [ + + ]: 17 : if (monmap->get_name(*it) == name)
2306 : 6 : continue;
2307 : :
2308 : 11 : entity_inst_t inst = monmap->get_inst(*it);
2309 : 11 : utime_t curr_time = ceph_clock_now(g_ceph_context);
2310 : 11 : timecheck_waiting[inst] = curr_time;
2311 [ + - ]: 11 : MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING);
2312 : 11 : m->epoch = get_epoch();
2313 : 11 : m->round = timecheck_round;
2314 [ - + ][ # # ]: 11 : dout(10) << __func__ << " send " << *m << " to " << inst << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
2315 : 11 : messenger->send_message(m, inst);
2316 : : }
2317 : :
2318 [ - + ][ # # ]: 6 : dout(10) << __func__ << " setting up next event and timeout" << dendl;
[ # # ][ # # ]
[ # # ]
2319 : 6 : timecheck_event = new C_TimeCheck(this);
2320 : :
2321 : 6 : timer.add_event_after(g_conf->mon_timecheck_interval, timecheck_event);
2322 : 6 : }
2323 : :
2324 : 13 : health_status_t Monitor::timecheck_status(ostringstream &ss,
2325 : : const double skew_bound,
2326 : : const double latency)
2327 : : {
2328 : 13 : health_status_t status = HEALTH_OK;
2329 [ + - ]: 13 : double abs_skew = (skew_bound > 0 ? skew_bound : -skew_bound);
2330 [ - + ]: 13 : assert(latency >= 0);
2331 : :
2332 [ - + ]: 13 : if (abs_skew > g_conf->mon_clock_drift_allowed) {
2333 : 0 : status = HEALTH_WARN;
2334 : 0 : ss << "clock skew " << abs_skew << "s"
2335 : 0 : << " > max " << g_conf->mon_clock_drift_allowed << "s";
2336 : : }
2337 : :
2338 : 13 : return status;
2339 : : }
2340 : :
2341 : 10 : void Monitor::handle_timecheck_leader(MTimeCheck *m)
2342 : : {
2343 [ - + ][ # # ]: 10 : dout(10) << __func__ << " " << *m << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
2344 : : /* handles PONG's */
2345 [ - + ]: 10 : assert(m->op == MTimeCheck::OP_PONG);
2346 [ - + ]: 10 : assert(m->epoch == timecheck_epoch);
2347 : :
2348 : 10 : entity_inst_t other = m->get_source_inst();
2349 : :
2350 [ - + ]: 10 : if (m->round < timecheck_round) {
2351 [ # # ][ # # ]: 0 : dout(1) << __func__ << " got old round " << m->round
[ # # ][ # # ]
2352 [ # # ][ # # ]: 0 : << " from " << other
2353 [ # # ][ # # ]: 0 : << " curr " << timecheck_round << " -- discard" << dendl;
[ # # ]
2354 : : return;
2355 : : }
2356 : :
2357 : 10 : utime_t curr_time = ceph_clock_now(g_ceph_context);
2358 : :
2359 [ - + ]: 10 : assert(timecheck_waiting.count(other) > 0);
2360 : 10 : utime_t timecheck_sent = timecheck_waiting[other];
2361 : 10 : timecheck_waiting.erase(other);
2362 [ - + ]: 10 : if (curr_time < timecheck_sent) {
2363 : : // our clock was readjusted -- drop everything until it all makes sense.
2364 [ # # ][ # # ]: 0 : dout(1) << __func__ << " our clock was readjusted --"
[ # # ][ # # ]
2365 [ # # ]: 0 : << " bump round and drop current check"
2366 [ # # ]: 0 : << dendl;
2367 : 0 : timecheck_round++;
2368 : 0 : timecheck_waiting.clear();
2369 : : return;
2370 : : }
2371 : :
2372 : : /* update peer latencies */
2373 : 20 : double latency = (double)(curr_time - timecheck_sent);
2374 : :
2375 [ + - ]: 10 : if (timecheck_latencies.count(other) == 0)
2376 : 10 : timecheck_latencies[other] = latency;
2377 : : else {
2378 : 0 : double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
2379 : 0 : timecheck_latencies[other] = avg_latency;
2380 : : }
2381 : :
2382 : : /*
2383 : : * update skews
2384 : : *
2385 : : * some nasty thing goes on if we were to do 'a - b' between two utime_t,
2386 : : * and 'a' happens to be lower than 'b'; so we use double instead.
2387 : : *
2388 : : * latency is always expected to be >= 0.
2389 : : *
2390 : : * delta, the difference between theirs timestamp and ours, may either be
2391 : : * lower or higher than 0; will hardly ever be 0.
2392 : : *
2393 : : * The absolute skew is the absolute delta minus the latency, which is
2394 : : * taken as a whole instead of an rtt given that there is some queueing
2395 : : * and dispatch times involved and it's hard to assess how long exactly
2396 : : * it took for the message to travel to the other side and be handled. So
2397 : : * we call it a bounded skew, the worst case scenario.
2398 : : *
2399 : : * Now, to math!
2400 : : *
2401 : : * Given that the latency is always positive, we can establish that the
2402 : : * bounded skew will be:
2403 : : *
2404 : : * 1. positive if the absolute delta is higher than the latency and
2405 : : * delta is positive
2406 : : * 2. negative if the absolute delta is higher than the latency and
2407 : : * delta is negative.
2408 : : * 3. zero if the absolute delta is lower than the latency.
2409 : : *
2410 : : * On 3. we make a judgement call and treat the skew as non-existent.
2411 : : * This is because that, if the absolute delta is lower than the
2412 : : * latency, then the apparently existing skew is nothing more than a
2413 : : * side-effect of the high latency at work.
2414 : : *
2415 : : * This may not be entirely true though, as a severely skewed clock
2416 : : * may be masked by an even higher latency, but with high latencies
2417 : : * we probably have worse issues to deal with than just skewed clocks.
2418 : : */
2419 [ - + ]: 10 : assert(latency >= 0);
2420 : :
2421 : 30 : double delta = ((double) m->timestamp) - ((double) curr_time);
2422 [ + + ]: 10 : double abs_delta = (delta > 0 ? delta : -delta);
2423 : 10 : double skew_bound = abs_delta - latency;
2424 [ - + ]: 10 : if (skew_bound < 0)
2425 : : skew_bound = 0;
2426 [ # # ]: 0 : else if (delta < 0)
2427 : 0 : skew_bound = -skew_bound;
2428 : :
2429 : 10 : ostringstream ss;
2430 [ + - ]: 10 : health_status_t status = timecheck_status(ss, skew_bound, latency);
2431 [ - + ]: 10 : if (status == HEALTH_ERR)
2432 [ # # ][ # # ]: 0 : clog.error() << other << " " << ss.str() << "\n";
[ # # ][ # # ]
2433 [ - + ]: 10 : else if (status == HEALTH_WARN)
2434 [ # # ][ # # ]: 0 : clog.warn() << other << " " << ss.str() << "\n";
[ # # ][ # # ]
2435 : :
2436 [ + - ][ - + ]: 10 : dout(10) << __func__ << " from " << other << " ts " << m->timestamp
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
2437 [ # # ][ # # ]: 0 : << " delta " << delta << " skew_bound " << skew_bound
2438 [ # # ][ # # ]: 0 : << " latency " << latency << dendl;
[ # # ]
2439 : :
2440 [ + - ]: 10 : if (timecheck_skews.count(other) == 0) {
2441 [ + - ]: 10 : timecheck_skews[other] = skew_bound;
2442 : : } else {
2443 [ # # ][ # # ]: 0 : timecheck_skews[other] = (timecheck_skews[other]*0.8)+(skew_bound*0.2);
2444 : : }
2445 : :
2446 [ + + ]: 10 : if (timecheck_waiting.size() == 0)
2447 [ + - ]: 10 : timecheck_report();
2448 : : }
2449 : :
2450 : 20 : void Monitor::handle_timecheck_peon(MTimeCheck *m)
2451 : : {
2452 [ - + ][ # # ]: 20 : dout(10) << __func__ << " " << *m << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
2453 : :
2454 [ - + ]: 20 : assert(is_peon());
2455 [ - + ]: 20 : assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT);
2456 : :
2457 [ - + ]: 20 : if (m->epoch != get_epoch()) {
2458 [ # # ][ # # ]: 0 : dout(1) << __func__ << " got wrong epoch "
[ # # ][ # # ]
2459 [ # # ]: 0 : << "(ours " << get_epoch()
2460 [ # # ][ # # ]: 0 : << " theirs: " << m->epoch << ") -- discarding" << dendl;
[ # # ]
2461 : : return;
2462 : : }
2463 : :
2464 [ + - ][ + + ]: 20 : if ((m->round < timecheck_round)
[ - + ]
2465 : : || (m->round == timecheck_round && m->op != MTimeCheck::OP_REPORT)) {
2466 [ # # ][ # # ]: 0 : dout(1) << __func__ << " got old round " << m->round
[ # # ][ # # ]
2467 [ # # ][ # # ]: 0 : << " current " << timecheck_round << " -- discarding" << dendl;
[ # # ]
2468 : : return;
2469 : : }
2470 : :
2471 [ + + ]: 20 : if (m->op == MTimeCheck::OP_REPORT) {
2472 : 10 : timecheck_latencies.swap(m->latencies);
2473 : 10 : timecheck_skews.swap(m->skews);
2474 : : return;
2475 : : }
2476 : :
2477 : 10 : timecheck_round = m->round;
2478 : :
2479 [ + - ]: 10 : MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG);
2480 : 10 : utime_t curr_time = ceph_clock_now(g_ceph_context);
2481 : 10 : reply->timestamp = curr_time;
2482 : 10 : reply->epoch = m->epoch;
2483 : 10 : reply->round = m->round;
2484 [ # # ][ # # ]: 10 : dout(10) << __func__ << " send " << *m
[ # # ][ # # ]
[ - + ]
2485 [ # # ][ # # ]: 0 : << " to " << m->get_source_inst() << dendl;
[ # # ]
2486 : 20 : messenger->send_message(reply, m->get_connection());
2487 : : }
2488 : :
2489 : 31 : void Monitor::handle_timecheck(MTimeCheck *m)
2490 : : {
2491 [ - + ][ # # ]: 31 : dout(10) << __func__ << " " << *m << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
2492 : :
2493 [ + + ]: 31 : if (is_leader()) {
2494 [ - + ]: 10 : if (m->op != MTimeCheck::OP_PONG) {
2495 [ # # ][ # # ]: 0 : dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
[ # # ][ # # ]
[ # # ]
2496 : : } else {
2497 : 10 : handle_timecheck_leader(m);
2498 : : }
2499 [ + + ]: 21 : } else if (is_peon()) {
2500 [ - + ]: 20 : if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) {
2501 [ # # ][ # # ]: 0 : dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
[ # # ][ # # ]
[ # # ]
2502 : : } else {
2503 : 20 : handle_timecheck_peon(m);
2504 : : }
2505 : : } else {
2506 [ + - ][ + - ]: 2 : dout(1) << __func__ << " drop unexpected msg" << dendl;
[ + - ][ + - ]
[ + - ]
2507 : : }
2508 : 31 : m->put();
2509 : 31 : }
2510 : :
2511 : 19 : void Monitor::handle_subscribe(MMonSubscribe *m)
2512 : : {
2513 [ - + ][ # # ]: 19 : dout(10) << "handle_subscribe " << *m << dendl;
[ # # ][ # # ]
[ # # ]
2514 : :
2515 : 19 : bool reply = false;
2516 : :
2517 : 19 : MonSession *s = (MonSession *)m->get_connection()->get_priv();
2518 [ - + ]: 19 : if (!s) {
2519 [ # # ][ # # ]: 0 : dout(10) << " no session, dropping" << dendl;
[ # # ][ # # ]
2520 : 0 : m->put();
2521 : 19 : return;
2522 : : }
2523 : :
2524 : 19 : s->until = ceph_clock_now(g_ceph_context);
2525 : 19 : s->until += g_conf->mon_subscribe_interval;
2526 [ + + ]: 53 : for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
2527 : 106 : p != m->what.end();
2528 : : p++) {
2529 : : // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
2530 [ + + ]: 34 : if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
2531 : 22 : reply = true;
2532 : :
2533 : : session_map.add_update_sub(s, p->first, p->second.start,
2534 : : p->second.flags & CEPH_SUBSCRIBE_ONETIME,
2535 : 34 : m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
2536 : :
2537 [ + + ]: 34 : if (p->first == "mdsmap") {
2538 [ + - ]: 3 : if ((int)s->caps.check_privileges(PAXOS_MDSMAP, MON_CAP_R)) {
2539 [ + - ][ + - ]: 3 : mdsmon()->check_sub(s->sub_map["mdsmap"]);
2540 : : }
2541 [ + + ]: 31 : } else if (p->first == "osdmap") {
2542 [ + - ]: 6 : if ((int)s->caps.check_privileges(PAXOS_OSDMAP, MON_CAP_R)) {
2543 [ + - ][ + - ]: 6 : osdmon()->check_sub(s->sub_map["osdmap"]);
2544 : : }
2545 [ + + ]: 25 : } else if (p->first == "osd_pg_creates") {
2546 [ + - ]: 6 : if ((int)s->caps.check_privileges(PAXOS_OSDMAP, MON_CAP_W)) {
2547 [ + - ][ + - ]: 6 : pgmon()->check_sub(s->sub_map["osd_pg_creates"]);
2548 : : }
2549 [ + - ]: 19 : } else if (p->first == "monmap") {
2550 [ + - ][ + - ]: 19 : check_sub(s->sub_map["monmap"]);
2551 [ # # ]: 0 : } else if (logmon()->sub_name_to_id(p->first) >= 0) {
2552 : 0 : logmon()->check_sub(s->sub_map[p->first]);
2553 : : }
2554 : : }
2555 : :
2556 : : // ???
2557 : :
2558 [ + - ]: 19 : if (reply)
2559 : 0 : messenger->send_message(new MMonSubscribeAck(monmap->get_fsid(), (int)g_conf->mon_subscribe_interval),
2560 : 38 : m->get_source_inst());
2561 : :
2562 : 19 : s->put();
2563 : 19 : m->put();
2564 : : }
2565 : :
2566 : 2 : void Monitor::handle_get_version(MMonGetVersion *m)
2567 : : {
2568 [ - + ][ # # ]: 2 : dout(10) << "handle_get_version " << *m << dendl;
[ # # ][ # # ]
[ # # ]
2569 : :
2570 : 2 : MonSession *s = (MonSession *)m->get_connection()->get_priv();
2571 [ - + ]: 2 : if (!s) {
2572 [ # # ][ # # ]: 0 : dout(10) << " no session, dropping" << dendl;
[ # # ][ # # ]
2573 : 0 : m->put();
2574 : 2 : return;
2575 : : }
2576 : :
2577 : 2 : MMonGetVersionReply *reply = new MMonGetVersionReply();
2578 : 2 : reply->handle = m->handle;
2579 [ - + ]: 2 : if (m->what == "mdsmap") {
2580 : 0 : reply->version = mdsmon()->mdsmap.get_epoch();
2581 : 0 : reply->oldest_version = mdsmon()->paxos->get_first_committed();
2582 [ + - ]: 2 : } else if (m->what == "osdmap") {
2583 : 2 : reply->version = osdmon()->osdmap.get_epoch();
2584 : 2 : reply->oldest_version = osdmon()->paxos->get_first_committed();
2585 [ # # ]: 0 : } else if (m->what == "monmap") {
2586 : 0 : reply->version = monmap->get_epoch();
2587 : 0 : reply->oldest_version = monmon()->paxos->get_first_committed();
2588 : : } else {
2589 [ # # ][ # # ]: 0 : derr << "invalid map type " << m->what << dendl;
[ # # ][ # # ]
[ # # ]
2590 : : }
2591 : :
2592 : 2 : messenger->send_message(reply, m->get_source_inst());
2593 : :
2594 : 2 : s->put();
2595 : 2 : m->put();
2596 : : }
2597 : :
2598 : 51 : bool Monitor::ms_handle_reset(Connection *con)
2599 : : {
2600 [ - + ][ # # ]: 51 : dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
2601 : :
2602 [ + - ]: 51 : if (state == STATE_SHUTDOWN)
2603 : : return false;
2604 : :
2605 : : // ignore lossless monitor sessions
2606 [ + - ]: 51 : if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
2607 : : return false;
2608 : :
2609 : 51 : MonSession *s = (MonSession *)con->get_priv();
2610 [ + + ]: 51 : if (!s)
2611 : : return false;
2612 : :
2613 : 23 : Mutex::Locker l(lock);
2614 : :
2615 [ - + ][ # # ]: 23 : dout(10) << "reset/close on session " << s->inst << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ + - ]
2616 [ + - ]: 23 : if (!s->closed)
2617 [ + - ]: 23 : remove_session(s);
2618 : 23 : s->put();
2619 : 23 : return true;
2620 : : }
2621 : :
2622 : 0 : void Monitor::check_subs()
2623 : : {
2624 : 0 : string type = "monmap";
2625 [ # # ]: 0 : if (session_map.subs.count(type) == 0)
2626 : 0 : return;
2627 [ # # ]: 0 : xlist<Subscription*>::iterator p = session_map.subs[type]->begin();
2628 [ # # ]: 0 : while (!p.end()) {
2629 : 0 : Subscription *sub = *p;
2630 : : ++p;
2631 [ # # ]: 0 : check_sub(sub);
2632 : 0 : }
2633 : : }
2634 : :
2635 : 19 : void Monitor::check_sub(Subscription *sub)
2636 : : {
2637 [ - + ][ # # ]: 19 : dout(10) << "check_sub monmap next " << sub->next << " have " << monmap->get_epoch() << dendl;
[ # # ][ # # ]
[ # # ]
2638 [ + + ]: 19 : if (sub->next <= monmap->get_epoch()) {
2639 : 10 : send_latest_monmap(sub->session->con);
2640 [ - + ]: 10 : if (sub->onetime)
2641 : 0 : session_map.remove_sub(sub);
2642 : : else
2643 : 10 : sub->next = monmap->get_epoch() + 1;
2644 : : }
2645 : 19 : }
2646 : :
2647 : :
2648 : : // -----
2649 : :
2650 : 23 : void Monitor::send_latest_monmap(Connection *con)
2651 : : {
2652 : 23 : bufferlist bl;
2653 [ + - ]: 23 : monmap->encode(bl, con->get_features());
2654 [ + - ][ + - ]: 46 : messenger->send_message(new MMonMap(bl), con);
2655 : 23 : }
2656 : :
2657 : 0 : void Monitor::handle_mon_get_map(MMonGetMap *m)
2658 : : {
2659 [ # # ][ # # ]: 0 : dout(10) << "handle_mon_get_map" << dendl;
[ # # ][ # # ]
2660 : 0 : send_latest_monmap(m->get_connection());
2661 : 0 : m->put();
2662 : 0 : }
2663 : :
2664 : :
2665 : :
2666 : :
2667 : :
2668 : :
2669 : :
2670 : : /************ TICK ***************/
2671 : :
2672 : 64 : class C_Mon_Tick : public Context {
2673 : : Monitor *mon;
2674 : : public:
2675 : 128 : C_Mon_Tick(Monitor *m) : mon(m) {}
2676 : 61 : void finish(int r) {
2677 : 61 : mon->tick();
2678 : 61 : }
2679 : : };
2680 : :
2681 : 64 : void Monitor::new_tick()
2682 : : {
2683 : 64 : C_Mon_Tick *ctx = new C_Mon_Tick(this);
2684 : 64 : timer.add_event_after(g_conf->mon_tick_interval, ctx);
2685 : 64 : }
2686 : :
2687 : 61 : void Monitor::tick()
2688 : : {
2689 : : // ok go.
2690 [ - + ][ # # ]: 61 : dout(11) << "tick" << dendl;
[ # # ][ # # ]
2691 : :
2692 [ + - ]: 61 : if (!is_slurping()) {
2693 [ + + ]: 427 : for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++) {
2694 : 366 : (*p)->tick();
2695 : : }
2696 : : }
2697 : :
2698 : : // trim sessions
2699 : 61 : utime_t now = ceph_clock_now(g_ceph_context);
2700 : 61 : xlist<MonSession*>::iterator p = session_map.sessions.begin();
2701 [ + + ]: 397 : while (!p.end()) {
2702 : 336 : MonSession *s = *p;
2703 : : ++p;
2704 : :
2705 : : // don't trim monitors
2706 [ + + ]: 336 : if (s->inst.name.is_mon())
2707 : 183 : continue;
2708 : :
2709 [ + - ][ + - ]: 306 : if (!s->until.is_zero() && s->until < now) {
[ - + ]
2710 [ # # ][ # # ]: 0 : dout(10) << " trimming session " << s->inst
[ # # ][ # # ]
2711 [ # # ][ # # ]: 0 : << " (until " << s->until << " < now " << now << ")" << dendl;
[ # # ][ # # ]
2712 : 0 : messenger->mark_down(s->inst.addr);
2713 : 0 : remove_session(s);
2714 [ + + ]: 153 : } else if (!exited_quorum.is_zero()) {
2715 [ - + ]: 3 : if (now > (exited_quorum + 2 * g_conf->mon_lease)) {
2716 : : // boot the client Session because we've taken too long getting back in
2717 [ # # ][ # # ]: 0 : dout(10) << " trimming session " << s->inst
[ # # ][ # # ]
2718 [ # # ][ # # ]: 0 : << " because we've been out of quorum too long" << dendl;
2719 : 0 : messenger->mark_down(s->inst.addr);
2720 : 336 : remove_session(s);
2721 : : }
2722 : : }
2723 : : }
2724 : :
2725 [ + + ]: 61 : if (!maybe_wait_for_quorum.empty()) {
2726 : 4 : finish_contexts(g_ceph_context, maybe_wait_for_quorum);
2727 : : }
2728 : :
2729 : 61 : new_tick();
2730 : 61 : }
2731 : :
2732 : 6 : int Monitor::check_fsid()
2733 : : {
2734 : 6 : ostringstream ss;
2735 [ + - ]: 6 : ss << monmap->get_fsid();
2736 : : string us = ss.str();
2737 [ + - ]: 6 : bufferlist ebl;
2738 [ + - ]: 6 : int r = store->get_bl_ss(ebl, "cluster_uuid", 0);
2739 [ + + ]: 6 : if (r < 0)
2740 : : return r;
2741 : :
2742 [ + - ][ + - ]: 3 : string es(ebl.c_str(), ebl.length());
2743 : :
2744 : : // only keep the first line
2745 : 3 : size_t pos = es.find_first_of('\n');
2746 [ + - ]: 3 : if (pos != string::npos)
2747 [ + - ]: 3 : es.resize(pos);
2748 : :
2749 [ + - ][ - + ]: 3 : dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
2750 [ + - ][ + - ]: 3 : if (es.length() < us.length() ||
[ - + ]
2751 : 9 : strncmp(us.c_str(), es.c_str(), us.length()) != 0) {
2752 [ # # ][ # # ]: 0 : derr << "error: cluster_uuid file exists with value '" << es
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
2753 [ # # ][ # # ]: 0 : << "', != our uuid " << monmap->get_fsid() << dendl;
[ # # ][ # # ]
2754 : : return -EEXIST;
2755 : : }
2756 : :
2757 [ + - ]: 6 : return 0;
2758 : : }
2759 : :
2760 : 3 : int Monitor::write_fsid()
2761 : : {
2762 : 3 : ostringstream ss;
2763 [ + - ][ + - ]: 3 : ss << monmap->get_fsid() << "\n";
2764 : : string us = ss.str();
2765 : :
2766 [ + - ]: 3 : bufferlist b;
2767 [ + - ]: 3 : b.append(us);
2768 [ + - ]: 3 : store->put_bl_ss(b, "cluster_uuid", 0);
2769 [ + - ]: 3 : return 0;
2770 : : }
2771 : :
2772 : : /*
2773 : : * this is the closest thing to a traditional 'mkfs' for ceph.
2774 : : * initialize the monitor state machines to their initial values.
2775 : : */
2776 : 3 : int Monitor::mkfs(bufferlist& osdmapbl)
2777 : : {
2778 : : // create it
2779 : 3 : int err = store->mkfs();
2780 [ - + ]: 3 : if (err) {
2781 [ # # ][ # # ]: 0 : derr << "store->mkfs failed with: " << cpp_strerror(err) << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
2782 : : return err;
2783 : : }
2784 : :
2785 : : // verify cluster fsid
2786 : 3 : int r = check_fsid();
2787 [ + - ]: 3 : if (r < 0 && r != -ENOENT)
2788 : : return r;
2789 : :
2790 : 3 : bufferlist magicbl;
2791 [ + - ][ + - ]: 3 : magicbl.append(CEPH_MON_ONDISK_MAGIC);
2792 [ + - ][ + - ]: 3 : magicbl.append("\n");
2793 [ + - ]: 3 : store->put_bl_ss(magicbl, "magic", 0);
2794 : :
2795 : :
2796 [ + - ][ + - ]: 3 : features = get_supported_features();
2797 [ + - ]: 3 : write_features();
2798 : :
2799 : : // save monmap, osdmap, keyring.
2800 [ + - ]: 3 : bufferlist monmapbl;
2801 [ + - ]: 3 : monmap->encode(monmapbl, CEPH_FEATURES_ALL);
2802 : 3 : monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
2803 [ + - ]: 3 : store->put_bl_ss(monmapbl, "mkfs", "monmap");
2804 : :
2805 [ + - ]: 3 : if (osdmapbl.length()) {
2806 : : // make sure it's a valid osdmap
2807 : : try {
2808 [ + - ]: 3 : OSDMap om;
2809 [ + - ][ + - ]: 3 : om.decode(osdmapbl);
2810 : : }
2811 [ # # ]: 0 : catch (buffer::error& e) {
2812 [ # # ][ # # ]: 0 : derr << "error decoding provided osdmap: " << e.what() << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
2813 : : return -EINVAL;
2814 : : }
2815 [ + - ]: 3 : store->put_bl_ss(osdmapbl, "mkfs", "osdmap");
2816 : : }
2817 : :
2818 : : KeyRing keyring;
2819 : : string keyring_filename;
2820 [ + - ][ - + ]: 3 : if (!ceph_resolve_file_search(g_conf->keyring, keyring_filename)) {
2821 [ # # ][ # # ]: 0 : derr << "unable to find a keyring file on " << g_conf->keyring << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
2822 : : return -ENOENT;
2823 : : }
2824 : :
2825 [ + - ]: 3 : r = keyring.load(g_ceph_context, keyring_filename);
2826 [ - + ]: 3 : if (r < 0) {
2827 [ # # ][ # # ]: 0 : derr << "unable to load initial keyring " << g_conf->keyring << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
2828 : : return r;
2829 : : }
2830 : :
2831 : : // put mon. key in external keyring; seed with everything else.
2832 [ + - ]: 3 : extract_save_mon_key(keyring);
2833 : :
2834 [ + - ]: 3 : bufferlist keyringbl;
2835 [ + - ]: 3 : keyring.encode_plaintext(keyringbl);
2836 [ + - ]: 3 : store->put_bl_ss(keyringbl, "mkfs", "keyring");
2837 : :
2838 : : // sync and write out fsid to indicate completion.
2839 [ + - ]: 3 : store->sync();
2840 [ + - ]: 3 : r = write_fsid();
2841 [ - + ]: 3 : if (r < 0)
2842 : : return r;
2843 : :
2844 [ + - ][ + - ]: 3 : return 0;
[ + - ]
2845 : : }
2846 : :
2847 : 6 : void Monitor::extract_save_mon_key(KeyRing& keyring)
2848 : : {
2849 : 6 : EntityName mon_name;
2850 [ + - ]: 6 : mon_name.set_type(CEPH_ENTITY_TYPE_MON);
2851 : : EntityAuth mon_key;
2852 [ + + ]: 6 : if (keyring.get_auth(mon_name, mon_key)) {
2853 [ + - ][ - + ]: 3 : dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
2854 : : KeyRing pkey;
2855 [ + - ]: 3 : pkey.add(mon_name, mon_key);
2856 [ + - ]: 3 : bufferlist bl;
2857 [ + - ]: 3 : pkey.encode_plaintext(bl);
2858 [ + - ]: 3 : store->put_bl_ss(bl, "keyring", NULL);
2859 [ + - ][ + - ]: 3 : keyring.remove(mon_name);
2860 [ + - ]: 6 : }
2861 : 6 : }
2862 : :
2863 : 6 : bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer, bool force_new)
2864 : : {
2865 [ - + ][ # # ]: 6 : dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id) << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
2866 : :
2867 [ + - ]: 6 : if (state == STATE_SHUTDOWN)
2868 : : return false;
2869 : :
2870 : : // we only connect to other monitors; every else connects to us.
2871 [ + - ]: 6 : if (service_id != CEPH_ENTITY_TYPE_MON)
2872 : : return false;
2873 : :
2874 [ + - ]: 6 : if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX))
2875 : : return false;
2876 : :
2877 : : CephXServiceTicketInfo auth_ticket_info;
2878 : : CephXSessionAuthInfo info;
2879 : : int ret;
2880 [ + - ]: 6 : EntityName name;
2881 [ + - ]: 6 : name.set_type(CEPH_ENTITY_TYPE_MON);
2882 : :
2883 : : auth_ticket_info.ticket.name = name;
2884 : 6 : auth_ticket_info.ticket.global_id = 0;
2885 : :
2886 : : CryptoKey secret;
2887 [ + - ][ - + ]: 6 : if (!keyring.get_secret(name, secret) &&
[ # # ][ - + ]
2888 [ # # ]: 0 : !key_server.get_secret(name, secret)) {
2889 [ # # ][ # # ]: 0 : dout(0) << " couldn't get secret for mon service from keyring or keyserver" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
2890 [ # # ]: 0 : stringstream ss;
2891 [ # # ]: 0 : key_server.list_secrets(ss);
2892 [ # # ][ # # ]: 0 : dout(0) << ss.str() << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
2893 [ # # ]: 0 : return false;
2894 : : }
2895 : :
2896 : : /* mon to mon authentication uses the private monitor shared key and not the
2897 : : rotating key */
2898 [ + - ]: 6 : ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info, secret, (uint64_t)-1);
2899 [ - + ]: 6 : if (ret < 0) {
2900 [ # # ][ # # ]: 0 : dout(0) << "ms_get_authorizer failed to build session auth_info for use with mon ret " << ret << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
2901 : : return false;
2902 : : }
2903 : :
2904 : : CephXTicketBlob blob;
2905 [ + - ][ - + ]: 6 : if (!cephx_build_service_ticket_blob(cct, info, blob)) {
2906 [ # # ][ # # ]: 0 : dout(0) << "ms_get_authorizer failed to build service ticket use with mon" << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
2907 : : return false;
2908 : : }
2909 [ + - ]: 6 : bufferlist ticket_data;
2910 : : ::encode(blob, ticket_data);
2911 : :
2912 [ + - ]: 6 : bufferlist::iterator iter = ticket_data.begin();
2913 : 6 : CephXTicketHandler handler(g_ceph_context, service_id);
2914 : : ::decode(handler.ticket, iter);
2915 : :
2916 [ + - ]: 6 : handler.session_key = info.session_key;
2917 : :
2918 [ + - ]: 6 : *authorizer = handler.build_authorizer(0);
2919 : :
2920 [ + - ][ + - ]: 12 : return true;
[ + - ]
2921 : : }
2922 : :
2923 : 60 : bool Monitor::ms_verify_authorizer(Connection *con, int peer_type,
2924 : : int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
2925 : : bool& isvalid, CryptoKey& session_key)
2926 : : {
2927 [ - + ][ # # ]: 60 : dout(10) << "ms_verify_authorizer " << con->get_peer_addr()
[ # # ][ # # ]
2928 [ # # ][ # # ]: 0 : << " " << ceph_entity_type_name(peer_type)
[ # # ]
2929 [ # # ][ # # ]: 0 : << " protocol " << protocol << dendl;
[ # # ]
2930 : :
2931 [ + - ]: 60 : if (state == STATE_SHUTDOWN)
2932 : : return false;
2933 : :
2934 [ + + - + ]: 66 : if (peer_type == CEPH_ENTITY_TYPE_MON &&
[ + + ]
2935 : 6 : auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
2936 : : // monitor, and cephx is enabled
2937 : 6 : isvalid = false;
2938 [ + - ]: 6 : if (protocol == CEPH_AUTH_CEPHX) {
2939 : 6 : bufferlist::iterator iter = authorizer_data.begin();
2940 : : CephXServiceTicketInfo auth_ticket_info;
2941 : :
2942 [ + - ]: 6 : if (authorizer_data.length()) {
2943 : : int ret = cephx_verify_authorizer(g_ceph_context, &keyring, iter,
2944 [ + - ]: 6 : auth_ticket_info, authorizer_reply);
2945 [ + - ]: 6 : if (ret >= 0) {
2946 [ + - ]: 6 : session_key = auth_ticket_info.session_key;
2947 : 6 : isvalid = true;
2948 : : } else {
2949 [ # # ][ # # ]: 0 : dout(0) << "ms_verify_authorizer bad authorizer from mon " << con->get_peer_addr() << dendl;
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
2950 : : }
2951 : 6 : }
2952 : : } else {
2953 [ # # ][ # # ]: 0 : dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl;
[ # # ][ # # ]
2954 : : }
2955 : : } else {
2956 : : // who cares.
2957 : 60 : isvalid = true;
2958 : : }
2959 : : return true;
2960 : 1649 : };
|