1 // Written in the D programming language.
2 
3 /**
4  * Fluent logger implementation.
5  *
6  * Fluentd is a missing event collector.
7  *
8  * Example:
9  * -----
10  * struct Event
11  * {
12  *     string text = "This is D";
13  *     long   id   = 0;
14  * }
15  *
16  * // Create a configuration
17  * FluentLogger.Configuration conf;
18  * conf.host = "backend1";
19  *
20  * // Create a logger with tag prefix and configuration
21  * auto logger = new FluentLogger("app", conf);
22  *
23  * // Write Event object with "test" tag to Fluentd 
24  * logger.post("test", Event());
25  * // Fluentd accepts {"text":"This is D","id":0} at "app.test" input
26  * 
27  * // Disconnect and perform cleanup
28  * logger.close(); // Or destroy(logger);
29  * -----
30  *
31  * See_Also:
32  *  $(LINK2 http://fluentd.org/, Welcome to Fluentd’s documentation!)
33  *
34  * Copyright: Copyright Masahiro Nakagawa 2012-.
35  * License:   <a href="http://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>.
36  * Authors:   Masahiro Nakagawa
37  */
38 
39 module fluent.logger;
40 
41 private import core.sync.mutex;
42 private import std.array;
43 private import std.datetime : Clock, SysTime;
44 private import std.socket : getAddress, lastSocketError, ProtocolType, Socket,
45                             SocketException, SocketShutdown, SocketType, TcpSocket;
46 
47 debug(FluentLogger) import std.stdio;  // TODO: replace with std.log
48 
49 private import msgpack;
50 
51 /**
52  * Base class for Fluent loggers
53  */
54 abstract class Logger
55 {
56     // should be changed to interface?
57   protected:
58     immutable string prefix_;
59 
60 
61   public:
62     @safe
63     this(in string prefix)
64     {
65         prefix_ = prefix;
66     }
67 
68     @property
69     const(ubyte[]) pendings() const;
70 
71     void close();
72 
73     /**
74      * Pack the given $(D_PARAM record) using MessagePack and
75      * write it with the current timestamp using $(D_PSYMBOL write).
76      *
77      * If a prefix was given when the logger was created the
78      * tag is appended to the prefix when posting. This
79      * allocation may be avoided by given a $(D_KEYWORD null)
80      * prefix in the constructor and the full tag here.
81      *
82      * Params:
83      *  tag = string used to tag the record
84      *  record = data to be packed via msgpack and sent
85      *
86      * Returns: True if the data was successfully sent
87      *          to the fluent server. False if the data
88      *          was queued for sending later but no
89      *          attempt was made to send to the remote
90      *          host because of a previous error.
91      * See_Also: write
92      */
93     bool post(T)(in string tag, auto ref const T record)
94     {
95         return post(tag, Clock.currTime(), record);
96     }
97 
98     /**
99      * Pack the given $(D_PARAM record) using MessagePack and
100      * write it with the given timestamp using $(D_PSYMBOL write).
101      *
102      * If a prefix was given when the logger was created the
103      * tag is appended to the prefix when posting. This
104      * allocation may be avoided by giving a $(D_KEYWORD null)
105      * prefix in the constructor and the full tag here.
106      *
107      * Params:
108      *  tag = string used to tag the record
109      *  time = timestamp of the event being logged
110      *  record = data to be packed via msgpack and sent
111      *
112      * Returns: True if the data was successfully sent
113      *          to the fluent server. False if the data
114      *          was queued for sending later but no
115      *          attempt was made to send to the remote
116      *          host because of a previous error.
117      * See_Also: write
118      */
119     bool post(T)(in string tag, in SysTime time, auto ref const T record)
120     {
121         auto completeTag = prefix_.length ? prefix_ ~ "." ~ tag : tag;
122         return write(pack!true(completeTag, time.toUnixTime(), record));
123     }
124 
125     /**
126      * Write an array of ubyte to the logger.
127      * Client code should generally use the post() functions
128      * of $(D_PSYMBOL Logger) instead of calling write() directly.
129      *
130      * Params:
131      *   data = The data to be written.
132      * Returns: True if the data was successfully sent
133      *          to the fluent host. False if the data
134      *          was queued for sending later but no
135      *          attempt was made to send to the remote
136      *          host because of a previous error.
137      * See_Also: post
138      */
139     bool write(in ubyte[] data);
140 }
141 
142 
143 class Tester : Logger
144 {
145   private:
146     ubyte[] buffer_;  // should have limit?
147     Mutex mutex_;
148 
149 
150   public:
151     @trusted
152     this(in string prefix)
153     {
154         super(prefix);
155 
156         mutex_ = new Mutex();
157     }
158 
159     @property
160     override const(ubyte[]) pendings() const
161     {
162         synchronized(mutex_) {
163             return buffer_;
164         }
165     }
166 
167     override void close()
168     {
169         buffer_ = null;
170     }
171 
172     override bool write(in ubyte[] data)
173     {
174         synchronized(mutex_) {
175             buffer_ ~= data;
176         }
177 
178         return true;
179     }
180 }
181 
182 
183 /**
184  * $(D_PSYMBOL FluentLogger) is a $(D_PSYMBOL Fluentd) client
185  */
186 class FluentLogger : Logger
187 {
188   private import fluent.databuffer : dataBuffer, DataBuffer;
189   public:
190     /**
191      * FluentLogger configuration
192      */
193     struct Configuration
194     {
195         string host = "localhost";
196         ushort port = 24224;
197         size_t initialBufferSize = 64;
198     }
199 
200 
201   private:
202     immutable Configuration config_;
203 
204     DataBuffer!ubyte buffer_ = void;
205     TcpSocket  socket_;
206 
207     // for reconnection
208     uint    errorNum_;
209     SysTime errorTime_;
210 
211     // for multi-threading
212     Mutex mutex_;
213 
214   public:
215 
216     /**
217      * Constructs a new $(D_PSYMBOL FluentLogger) instance using the given $(D_PSYMBOL Configuration).
218      *
219      * Params:
220      *  prefix = Prefix to use before the tag for each post. May be null.
221      *  config = Specifies the $(D_PSYMBOL Configuration) to use for this particular instance.
222      */
223     @trusted
224     this(in string prefix, in Configuration config)
225     {
226         super(prefix);
227 
228         config_ = config;
229         mutex_ = new Mutex();
230 
231         ubyte[] tmpBuf = new ubyte[config.initialBufferSize];
232         buffer_ = dataBuffer(tmpBuf);
233     }
234 
235     /**
236      * Destructor.
237      *
238      * Closes the logger.
239      */
240     ~this()
241     {
242         close();
243         buffer_.free();
244     }
245 
246     /**
247      * Returns:
248      *  A slice into the buffer of data waiting to be sent that is only
249      *  valid until the next post(), write(), or close().
250      */
251     @property
252     override const(ubyte[]) pendings() const
253     {
254         synchronized(mutex_) {
255             return buffer_[];
256         }
257     }
258 
259     /**
260      * Flush the remaining data in the buffer and close the
261      * connection to the remote fluent host.
262      *
263      * If the data in the buffer can't be sent it is discarded and
264      * the buffer is cleared.
265      *
266      * It is possible to continue using the $(D_PSYMBOL FluentLogger) after close()
267      * has been called. The next call to write (or post) will
268      * open a new connection to the fluent host. But doing this is discouraged
269      * because in general it is expected that no further operations
270      * are performed after calling close() on implementations of $(D_PSYMBOL Logger).
271      */
272     override void close()
273     {
274         synchronized(mutex_) {
275             if (socket_ !is null) {
276                 if (buffer_.length > 0) {
277                     try {
278                         send(buffer_[]);
279                         buffer_.length = 0;
280                     } catch (const SocketException e) {
281                         debug(FluentLogger) { writeln("FluentLogger: Failed to flush logs. ", buffer_.length, " bytes not sent."); }
282                     }
283                 }
284 
285                 clearSocket();
286             }
287         }
288     }
289 
290     /**
291      * Write an array of ubyte to the logger.
292      * Client code should generally use the post() functions
293      * of $(D_PSYMBOL Logger) instead of calling write() directly.
294      *
295      * Params:
296      *   data = The data to be written.
297      * Throws: $(D_PSYMBOL SocketException) if an error
298      *          occurs sending data to the fluent host.
299      * Returns: True if the data was successfully sent
300      *          to the fluent host. False if the data
301      *          was queued for sending later but no
302      *          attempt was made to send to the remote
303      *          host because of a previous error.
304      * See_Also: post
305      */
306     override bool write(in ubyte[] data)
307     {
308         synchronized(mutex_) {
309             buffer_.put(data);
310             if (!canWrite())
311                 return false;
312 
313             try {
314                 send(buffer_[]);
315                 buffer_.length = 0;
316             } catch (SocketException e) {
317                 errorNum_++;
318                 errorTime_ = Clock.currTime();
319                 clearSocket();
320                 throw e;
321             }
322         }
323 
324         return true;
325     }
326 
327 
328   private:
329     /**
330      * Connects to the remote host.
331      *
332      * Throws:
333      *  $(D_PSYMBOL SocketException) if the connection fails.
334      *  $(D_PSYMBOL Exception) if an address can't be found for the host.
335      */
336     @trusted
337     void connect()
338     {
339         auto addresses = getAddress(config_.host, config_.port);
340         if (addresses.length == 0)
341             throw new Exception("Failed to resolve host: host = " ~ config_.host);
342 
343         // hostname sometimes provides many address informations
344         foreach (i, ref address; addresses) {
345             try {
346                 auto socket = new TcpSocket(address);
347                 socket_    = socket;
348                 errorNum_  = 0;
349                 errorTime_ = SysTime.init;
350 
351                 debug(FluentLogger) { writeln("FluentLogger: Connected to: host = ", config_.host, ", port = ", config_.port); }
352 
353                 return;
354             } catch (SocketException e) {
355                 clearSocket();
356 
357                 // If all hosts can't be connected, raises an exeception
358                 if (i == addresses.length - 1) {
359                     errorNum_++;
360                     errorTime_ = Clock.currTime();
361 
362                     throw e;
363                 }
364             }
365         }
366     }
367 
368     /**
369      * Send the specified data to the fluent host.
370      *
371      * If not already connected to the fluent host
372      * connect() is called. Therefore this function
373      * throws the exceptions connect() throws in
374      * addition to the exceptions listed here.
375      *
376      * See_Also: connect
377      *
378      * Params:
379      *  data = The data to send.
380      * Throws:
381      *  $(D_PSYMBOL SocketException) if unable to send the data.
382      */
383     @trusted
384     void send(in ubyte[] data)
385     {
386         if (socket_ is null)
387             connect();
388 
389         auto bytesSent = socket_.send(data);
390         if (bytesSent == Socket.ERROR) {
391             throw new SocketException("Unable to send to socket. ", lastSocketError());
392         }
393 
394         debug(FluentLogger) { writeln("FluentLogger: Sent ", data.length, " bytes"); }
395     }
396     
397     /**
398      * Close the existing socket connection to the fluent host, if any.
399      */
400     void clearSocket() nothrow
401     {
402         // reconnection at send method.
403         if (socket_ !is null) {
404             try {
405                 socket_.shutdown(SocketShutdown.BOTH);
406                 socket_.close();
407             } catch (Exception e) {
408                 /* Ignore any exceptions. We're done with
409                  * the socket anyway so they don't matter.
410                  */
411             }
412         }
413         socket_ = null;
414     }
415 
416     /**
417      * Specifies the maximum number of seconds to wait
418      * to send data to the fluent host from the last
419      * timestamp that an error was encountered.
420      */
421     enum ReconnectionWaitingMax = 60u;
422 
423     /**
424      * Returns true if data should attempt to be
425      * sent and false otherwise.
426      *
427      * If no errors have been encountered this function
428      * will return true. As errors are encountered the
429      * function will back off until at least $(D_PSYMBOL ReconnectionWaitingMax)
430      * seconds have passed since the last error.
431      */
432     /* @safe */ @trusted
433     bool canWrite()
434     {
435         // prevent useless reconnection
436         if (errorTime_ != SysTime.init) {
437             // TODO: more complex?
438             uint secs = 2 ^^ errorNum_;
439             if (secs > ReconnectionWaitingMax)
440                 secs = ReconnectionWaitingMax;
441 
442             if ((Clock.currTime() - errorTime_).total!"seconds"() < secs)
443                 return false;
444         }
445 
446         return true;
447     }
448 }