1 // Written in the D programming language. 2 3 /** 4 * Fluent logger implementation. 5 * 6 * Fluentd is a missing event collector. 7 * 8 * Example: 9 * ----- 10 * struct Event 11 * { 12 * string text = "This is D"; 13 * long id = 0; 14 * } 15 * 16 * // Create a configuration 17 * FluentLogger.Configuration conf; 18 * conf.host = "backend1"; 19 * 20 * // Create a logger with tag prefix and configuration 21 * auto logger = new FluentLogger("app", conf); 22 * 23 * // Write Event object with "test" tag to Fluentd 24 * logger.post("test", Event()); 25 * // Fluentd accepts {"text":"This is D","id":0} at "app.test" input 26 * 27 * // Disconnect and perform cleanup 28 * logger.close(); // Or destroy(logger); 29 * ----- 30 * 31 * See_Also: 32 * $(LINK2 http://fluentd.org/, Welcome to Fluentd’s documentation!) 33 * 34 * Copyright: Copyright Masahiro Nakagawa 2012-. 35 * License: <a href="http://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>. 36 * Authors: Masahiro Nakagawa 37 */ 38 39 module fluent.logger; 40 41 private import core.sync.mutex; 42 private import std.array; 43 private import std.datetime : Clock, SysTime; 44 private import std.socket : getAddress, lastSocketError, ProtocolType, Socket, 45 SocketException, SocketShutdown, SocketType, TcpSocket; 46 47 debug(FluentLogger) import std.stdio; // TODO: replace with std.log 48 49 private import msgpack; 50 51 /** 52 * Base class for Fluent loggers 53 */ 54 abstract class Logger 55 { 56 // should be changed to interface? 57 protected: 58 immutable string prefix_; 59 60 61 public: 62 @safe 63 this(in string prefix) 64 { 65 prefix_ = prefix; 66 } 67 68 @property 69 const(ubyte[]) pendings() const; 70 71 void close(); 72 73 /** 74 * Pack the given $(D_PARAM record) using MessagePack and 75 * write it with the current timestamp using $(D_PSYMBOL write). 76 * 77 * If a prefix was given when the logger was created the 78 * tag is appended to the prefix when posting. This 79 * allocation may be avoided by given a $(D_KEYWORD null) 80 * prefix in the constructor and the full tag here. 81 * 82 * Params: 83 * tag = string used to tag the record 84 * record = data to be packed via msgpack and sent 85 * 86 * Returns: True if the data was successfully sent 87 * to the fluent server. False if the data 88 * was queued for sending later but no 89 * attempt was made to send to the remote 90 * host because of a previous error. 91 * See_Also: write 92 */ 93 bool post(T)(in string tag, auto ref const T record) 94 { 95 return post(tag, Clock.currTime(), record); 96 } 97 98 /** 99 * Pack the given $(D_PARAM record) using MessagePack and 100 * write it with the given timestamp using $(D_PSYMBOL write). 101 * 102 * If a prefix was given when the logger was created the 103 * tag is appended to the prefix when posting. This 104 * allocation may be avoided by giving a $(D_KEYWORD null) 105 * prefix in the constructor and the full tag here. 106 * 107 * Params: 108 * tag = string used to tag the record 109 * time = timestamp of the event being logged 110 * record = data to be packed via msgpack and sent 111 * 112 * Returns: True if the data was successfully sent 113 * to the fluent server. False if the data 114 * was queued for sending later but no 115 * attempt was made to send to the remote 116 * host because of a previous error. 117 * See_Also: write 118 */ 119 bool post(T)(in string tag, in SysTime time, auto ref const T record) 120 { 121 auto completeTag = prefix_.length ? prefix_ ~ "." ~ tag : tag; 122 return write(pack!true(completeTag, time.toUnixTime(), record)); 123 } 124 125 /** 126 * Write an array of ubyte to the logger. 127 * Client code should generally use the post() functions 128 * of $(D_PSYMBOL Logger) instead of calling write() directly. 129 * 130 * Params: 131 * data = The data to be written. 132 * Returns: True if the data was successfully sent 133 * to the fluent host. False if the data 134 * was queued for sending later but no 135 * attempt was made to send to the remote 136 * host because of a previous error. 137 * See_Also: post 138 */ 139 bool write(in ubyte[] data); 140 } 141 142 143 class Tester : Logger 144 { 145 private: 146 ubyte[] buffer_; // should have limit? 147 Mutex mutex_; 148 149 150 public: 151 @trusted 152 this(in string prefix) 153 { 154 super(prefix); 155 156 mutex_ = new Mutex(); 157 } 158 159 @property 160 override const(ubyte[]) pendings() const 161 { 162 synchronized(mutex_) { 163 return buffer_; 164 } 165 } 166 167 override void close() 168 { 169 buffer_ = null; 170 } 171 172 override bool write(in ubyte[] data) 173 { 174 synchronized(mutex_) { 175 buffer_ ~= data; 176 } 177 178 return true; 179 } 180 } 181 182 183 /** 184 * $(D_PSYMBOL FluentLogger) is a $(D_PSYMBOL Fluentd) client 185 */ 186 class FluentLogger : Logger 187 { 188 private import fluent.databuffer : dataBuffer, DataBuffer; 189 public: 190 /** 191 * FluentLogger configuration 192 */ 193 struct Configuration 194 { 195 string host = "localhost"; 196 ushort port = 24224; 197 size_t initialBufferSize = 64; 198 } 199 200 201 private: 202 immutable Configuration config_; 203 204 DataBuffer!ubyte buffer_ = void; 205 TcpSocket socket_; 206 207 // for reconnection 208 uint errorNum_; 209 SysTime errorTime_; 210 211 // for multi-threading 212 Mutex mutex_; 213 214 public: 215 216 /** 217 * Constructs a new $(D_PSYMBOL FluentLogger) instance using the given $(D_PSYMBOL Configuration). 218 * 219 * Params: 220 * prefix = Prefix to use before the tag for each post. May be null. 221 * config = Specifies the $(D_PSYMBOL Configuration) to use for this particular instance. 222 */ 223 @trusted 224 this(in string prefix, in Configuration config) 225 { 226 super(prefix); 227 228 config_ = config; 229 mutex_ = new Mutex(); 230 231 ubyte[] tmpBuf = new ubyte[config.initialBufferSize]; 232 buffer_ = dataBuffer(tmpBuf); 233 } 234 235 /** 236 * Destructor. 237 * 238 * Closes the logger. 239 */ 240 ~this() 241 { 242 close(); 243 buffer_.free(); 244 } 245 246 /** 247 * Returns: 248 * A slice into the buffer of data waiting to be sent that is only 249 * valid until the next post(), write(), or close(). 250 */ 251 @property 252 override const(ubyte[]) pendings() const 253 { 254 synchronized(mutex_) { 255 return buffer_[]; 256 } 257 } 258 259 /** 260 * Flush the remaining data in the buffer and close the 261 * connection to the remote fluent host. 262 * 263 * If the data in the buffer can't be sent it is discarded and 264 * the buffer is cleared. 265 * 266 * It is possible to continue using the $(D_PSYMBOL FluentLogger) after close() 267 * has been called. The next call to write (or post) will 268 * open a new connection to the fluent host. But doing this is discouraged 269 * because in general it is expected that no further operations 270 * are performed after calling close() on implementations of $(D_PSYMBOL Logger). 271 */ 272 override void close() 273 { 274 synchronized(mutex_) { 275 if (socket_ !is null) { 276 if (buffer_.length > 0) { 277 try { 278 send(buffer_[]); 279 buffer_.length = 0; 280 } catch (const SocketException e) { 281 debug(FluentLogger) { writeln("FluentLogger: Failed to flush logs. ", buffer_.length, " bytes not sent."); } 282 } 283 } 284 285 clearSocket(); 286 } 287 } 288 } 289 290 /** 291 * Write an array of ubyte to the logger. 292 * Client code should generally use the post() functions 293 * of $(D_PSYMBOL Logger) instead of calling write() directly. 294 * 295 * Params: 296 * data = The data to be written. 297 * Throws: $(D_PSYMBOL SocketException) if an error 298 * occurs sending data to the fluent host. 299 * Returns: True if the data was successfully sent 300 * to the fluent host. False if the data 301 * was queued for sending later but no 302 * attempt was made to send to the remote 303 * host because of a previous error. 304 * See_Also: post 305 */ 306 override bool write(in ubyte[] data) 307 { 308 synchronized(mutex_) { 309 buffer_.put(data); 310 if (!canWrite()) 311 return false; 312 313 try { 314 send(buffer_[]); 315 buffer_.length = 0; 316 } catch (SocketException e) { 317 errorNum_++; 318 errorTime_ = Clock.currTime(); 319 clearSocket(); 320 throw e; 321 } 322 } 323 324 return true; 325 } 326 327 328 private: 329 /** 330 * Connects to the remote host. 331 * 332 * Throws: 333 * $(D_PSYMBOL SocketException) if the connection fails. 334 * $(D_PSYMBOL Exception) if an address can't be found for the host. 335 */ 336 @trusted 337 void connect() 338 { 339 auto addresses = getAddress(config_.host, config_.port); 340 if (addresses.length == 0) 341 throw new Exception("Failed to resolve host: host = " ~ config_.host); 342 343 // hostname sometimes provides many address informations 344 foreach (i, ref address; addresses) { 345 try { 346 auto socket = new TcpSocket(address); 347 socket_ = socket; 348 errorNum_ = 0; 349 errorTime_ = SysTime.init; 350 351 debug(FluentLogger) { writeln("FluentLogger: Connected to: host = ", config_.host, ", port = ", config_.port); } 352 353 return; 354 } catch (SocketException e) { 355 clearSocket(); 356 357 // If all hosts can't be connected, raises an exeception 358 if (i == addresses.length - 1) { 359 errorNum_++; 360 errorTime_ = Clock.currTime(); 361 362 throw e; 363 } 364 } 365 } 366 } 367 368 /** 369 * Send the specified data to the fluent host. 370 * 371 * If not already connected to the fluent host 372 * connect() is called. Therefore this function 373 * throws the exceptions connect() throws in 374 * addition to the exceptions listed here. 375 * 376 * See_Also: connect 377 * 378 * Params: 379 * data = The data to send. 380 * Throws: 381 * $(D_PSYMBOL SocketException) if unable to send the data. 382 */ 383 @trusted 384 void send(in ubyte[] data) 385 { 386 if (socket_ is null) 387 connect(); 388 389 auto bytesSent = socket_.send(data); 390 if (bytesSent == Socket.ERROR) { 391 throw new SocketException("Unable to send to socket. ", lastSocketError()); 392 } 393 394 debug(FluentLogger) { writeln("FluentLogger: Sent ", data.length, " bytes"); } 395 } 396 397 /** 398 * Close the existing socket connection to the fluent host, if any. 399 */ 400 void clearSocket() nothrow 401 { 402 // reconnection at send method. 403 if (socket_ !is null) { 404 try { 405 socket_.shutdown(SocketShutdown.BOTH); 406 socket_.close(); 407 } catch (Exception e) { 408 /* Ignore any exceptions. We're done with 409 * the socket anyway so they don't matter. 410 */ 411 } 412 } 413 socket_ = null; 414 } 415 416 /** 417 * Specifies the maximum number of seconds to wait 418 * to send data to the fluent host from the last 419 * timestamp that an error was encountered. 420 */ 421 enum ReconnectionWaitingMax = 60u; 422 423 /** 424 * Returns true if data should attempt to be 425 * sent and false otherwise. 426 * 427 * If no errors have been encountered this function 428 * will return true. As errors are encountered the 429 * function will back off until at least $(D_PSYMBOL ReconnectionWaitingMax) 430 * seconds have passed since the last error. 431 */ 432 /* @safe */ @trusted 433 bool canWrite() 434 { 435 // prevent useless reconnection 436 if (errorTime_ != SysTime.init) { 437 // TODO: more complex? 438 uint secs = 2 ^^ errorNum_; 439 if (secs > ReconnectionWaitingMax) 440 secs = ReconnectionWaitingMax; 441 442 if ((Clock.currTime() - errorTime_).total!"seconds"() < secs) 443 return false; 444 } 445 446 return true; 447 } 448 }