WebSocket++
0.7.0
C++ websocket client/server library
Main Page
Related Pages
Namespaces
Classes
Files
File List
websocketpp
impl
connection_impl.hpp
1
/*
2
* Copyright (c) 2014, Peter Thorson. All rights reserved.
3
*
4
* Redistribution and use in source and binary forms, with or without
5
* modification, are permitted provided that the following conditions are met:
6
* * Redistributions of source code must retain the above copyright
7
* notice, this list of conditions and the following disclaimer.
8
* * Redistributions in binary form must reproduce the above copyright
9
* notice, this list of conditions and the following disclaimer in the
10
* documentation and/or other materials provided with the distribution.
11
* * Neither the name of the WebSocket++ Project nor the
12
* names of its contributors may be used to endorse or promote products
13
* derived from this software without specific prior written permission.
14
*
15
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
* ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY
19
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25
*
26
*/
27
28
#
ifndef
WEBSOCKETPP_CONNECTION_IMPL_HPP
29
#
define
WEBSOCKETPP_CONNECTION_IMPL_HPP
30
31
#
include
<
websocketpp
/
processors
/
hybi00
.
hpp
>
32
#
include
<
websocketpp
/
processors
/
hybi07
.
hpp
>
33
#
include
<
websocketpp
/
processors
/
hybi08
.
hpp
>
34
#
include
<
websocketpp
/
processors
/
hybi13
.
hpp
>
35
36
#
include
<
websocketpp
/
processors
/
processor
.
hpp
>
37
38
#
include
<
websocketpp
/
common
/
platforms
.
hpp
>
39
#
include
<
websocketpp
/
common
/
system_error
.
hpp
>
40
41
#
include
<
algorithm
>
42
#
include
<
exception
>
43
#
include
<
sstream
>
44
#
include
<
string
>
45
#
include
<
utility
>
46
#
include
<
vector
>
47
48
namespace
websocketpp
{
49
50
namespace
istate
=
session
::
internal_state
;
51
52
template
<
typename
config>
53
void
connection<
config
>::
set_termination_handler
(
54
termination_handler
new_handler
)
55
{
56
m_alog
.
write
(
log
::
alevel
::
devel
,
57
"connection set_termination_handler"
);
58
59
//scoped_lock_type lock(m_connection_state_lock);
60
61
m_termination_handler
=
new_handler
;
62
}
63
64
template
<
typename
config
>
65
std
::
string
const
&
connection
<
config
>::
get_origin
()
const
{
66
//scoped_lock_type lock(m_connection_state_lock);
67
return
m_processor
->
get_origin
(
m_request
);
68
}
69
70
template
<
typename
config
>
71
size_t
connection
<
config
>::
get_buffered_amount
()
const
{
72
//scoped_lock_type lock(m_connection_state_lock);
73
return
m_send_buffer_size
;
74
}
75
76
template
<
typename
config
>
77
session
::
state
::
value
connection
<
config
>::
get_state
()
const
{
78
//scoped_lock_type lock(m_connection_state_lock);
79
return
m_state
;
80
}
81
82
template
<
typename
config
>
83
lib
::
error_code
connection
<
config
>::
send
(
std
::
string
const
&
payload
,
84
frame
::
opcode
::
value
op
)
85
{
86
message_ptr
msg
=
m_msg_manager
->
get_message
(
op
,
payload
.
size
());
87
msg
->
append_payload
(
payload
);
88
msg
->
set_compressed
(
true
);
89
90
return
send
(
msg
);
91
}
92
93
template
<
typename
config
>
94
lib
::
error_code
connection
<
config
>::
send
(
void
const
*
payload
,
size_t
len
,
95
frame
::
opcode
::
value
op
)
96
{
97
message_ptr
msg
=
m_msg_manager
->
get_message
(
op
,
len
);
98
msg
->
append_payload
(
payload
,
len
);
99
100
return
send
(
msg
);
101
}
102
103
template
<
typename
config
>
104
lib
::
error_code
connection
<
config
>::
send
(
typename
config
::
message_type
::
ptr
msg
)
105
{
106
if
(
m_alog
.
static_test
(
log
::
alevel
::
devel
)) {
107
m_alog
.
write
(
log
::
alevel
::
devel
,
"connection send"
);
108
}
109
110
{
111
scoped_lock_type
lock
(
m_connection_state_lock
);
112
if
(
m_state
!=
session
::
state
::
open
) {
113
return
error
::
make_error_code
(
error
::
invalid_state
);
114
}
115
}
116
117
message_ptr
outgoing_msg
;
118
bool
needs_writing
=
false
;
119
120
if
(
msg
->
get_prepared
()) {
121
outgoing_msg
=
msg
;
122
123
scoped_lock_type
lock
(
m_write_lock
);
124
write_push
(
outgoing_msg
);
125
needs_writing
= !
m_write_flag
&& !
m_send_queue
.
empty
();
126
}
else
{
127
outgoing_msg
=
m_msg_manager
->
get_message
();
128
129
if
(!
outgoing_msg
) {
130
return
error
::
make_error_code
(
error
::
no_outgoing_buffers
);
131
}
132
133
scoped_lock_type
lock
(
m_write_lock
);
134
lib
::
error_code
ec
=
m_processor
->
prepare_data_frame
(
msg
,
outgoing_msg
);
135
136
if
(
ec
) {
137
return
ec
;
138
}
139
140
write_push
(
outgoing_msg
);
141
needs_writing
= !
m_write_flag
&& !
m_send_queue
.
empty
();
142
}
143
144
if
(
needs_writing
) {
145
transport_con_type
::
dispatch
(
lib
::
bind
(
146
&
type
::
write_frame
,
147
type
::
get_shared
()
148
));
149
}
150
151
return
lib
::
error_code
();
152
}
153
154
template
<
typename
config
>
155
void
connection
<
config
>::
ping
(
std
::
string
const
&
payload
,
lib
::
error_code
&
ec
) {
156
if
(
m_alog
.
static_test
(
log
::
alevel
::
devel
)) {
157
m_alog
.
write
(
log
::
alevel
::
devel
,
"connection ping"
);
158
}
159
160
{
161
scoped_lock_type
lock
(
m_connection_state_lock
);
162
if
(
m_state
!=
session
::
state
::
open
) {
163
std
::
stringstream
ss
;
164
ss
<<
"connection::ping called from invalid state "
<<
m_state
;
165
m_alog
.
write
(
log
::
alevel
::
devel
,
ss
.
str
());
166
ec
=
error
::
make_error_code
(
error
::
invalid_state
);
167
return
;
168
}
169
}
170
171
message_ptr
msg
=
m_msg_manager
->
get_message
();
172
if
(!
msg
) {
173
ec
=
error
::
make_error_code
(
error
::
no_outgoing_buffers
);
174
return
;
175
}
176
177
ec
=
m_processor
->
prepare_ping
(
payload
,
msg
);
178
if
(
ec
) {
return
;}
179
180
// set ping timer if we are listening for one
181
if
(
m_pong_timeout_handler
) {
182
// Cancel any existing timers
183
if
(
m_ping_timer
) {
184
m_ping_timer
->
cancel
();
185
}
186
187
if
(
m_pong_timeout_dur
> 0) {
188
m_ping_timer
=
transport_con_type
::
set_timer
(
189
m_pong_timeout_dur
,
190
lib
::
bind
(
191
&
type
::
handle_pong_timeout
,
192
type
::
get_shared
(),
193
payload
,
194
lib
::
placeholders
::
_1
195
)
196
);
197
}
198
199
if
(!
m_ping_timer
) {
200
// Our transport doesn't support timers
201
m_elog
.
write
(
log
::
elevel
::
warn
,
"Warning: a pong_timeout_handler is \
202
set but the transport in use does not support timeouts."
);
203
}
204
}
205
206
bool
needs_writing
=
false
;
207
{
208
scoped_lock_type
lock
(
m_write_lock
);
209
write_push
(
msg
);
210
needs_writing
= !
m_write_flag
&& !
m_send_queue
.
empty
();
211
}
212
213
if
(
needs_writing
) {
214
transport_con_type
::
dispatch
(
lib
::
bind
(
215
&
type
::
write_frame
,
216
type
::
get_shared
()
217
));
218
}
219
220
ec
=
lib
::
error_code
();
221
}
222
223
template
<
typename
config
>
224
void
connection
<
config
>::
ping
(
std
::
string
const
&
payload
) {
225
lib
::
error_code
ec
;
226
ping
(
payload
,
ec
);
227
if
(
ec
) {
228
throw
exception
(
ec
);
229
}
230
}
231
232
template
<
typename
config
>
233
void
connection
<
config
>::
handle_pong_timeout
(
std
::
string
payload
,
234
lib
::
error_code
const
&
ec
)
235
{
236
if
(
ec
) {
237
if
(
ec
==
transport
::
error
::
operation_aborted
) {
238
// ignore, this is expected
239
return
;
240
}
241
242
m_elog
.
write
(
log
::
elevel
::
devel
,
"pong_timeout error: "
+
ec
.
message
());
243
return
;
244
}
245
246
if
(
m_pong_timeout_handler
) {
247
m_pong_timeout_handler
(
m_connection_hdl
,
payload
);
248
}
249
}
250
251
template
<
typename
config
>
252
void
connection
<
config
>::
pong
(
std
::
string
const
&
payload
,
lib
::
error_code
&
ec
) {
253
if
(
m_alog
.
static_test
(
log
::
alevel
::
devel
)) {
254
m_alog
.
write
(
log
::
alevel
::
devel
,
"connection pong"
);
255
}
256
257
{
258
scoped_lock_type
lock
(
m_connection_state_lock
);
259
if
(
m_state
!=
session
::
state
::
open
) {
260
std
::
stringstream
ss
;
261
ss
<<
"connection::pong called from invalid state "
<<
m_state
;
262
m_alog
.
write
(
log
::
alevel
::
devel
,
ss
.
str
());
263
ec
=
error
::
make_error_code
(
error
::
invalid_state
);
264
return
;
265
}
266
}
267
268
message_ptr
msg
=
m_msg_manager
->
get_message
();
269
if
(!
msg
) {
270
ec
=
error
::
make_error_code
(
error
::
no_outgoing_buffers
);
271
return
;
272
}
273
274
ec
=
m_processor
->
prepare_pong
(
payload
,
msg
);
275
if
(
ec
) {
return
;}
276
277
bool
needs_writing
=
false
;
278
{
279
scoped_lock_type
lock
(
m_write_lock
);
280
write_push
(
msg
);
281
needs_writing
= !
m_write_flag
&& !
m_send_queue
.
empty
();
282
}
283
284
if
(
needs_writing
) {
285
transport_con_type
::
dispatch
(
lib
::
bind
(
286
&
type
::
write_frame
,
287
type
::
get_shared
()
288
));
289
}
290
291
ec
=
lib
::
error_code
();
292
}
293
294
template
<
typename
config
>
295
void
connection
<
config
>::
pong
(
std
::
string
const
&
payload
) {
296
lib
::
error_code
ec
;
297
pong
(
payload
,
ec
);
298
if
(
ec
) {
299
throw
exception
(
ec
);
300
}
301
}
302
303
template
<
typename
config
>
304
void
connection
<
config
>::
close
(
close
::
status
::
value
const
code
,
305
std
::
string
const
&
reason
,
lib
::
error_code
&
ec
)
306
{
307
if
(
m_alog
.
static_test
(
log
::
alevel
::
devel
)) {
308
m_alog
.
write
(
log
::
alevel
::
devel
,
"connection close"
);
309
}
310
311
// Truncate reason to maximum size allowable in a close frame.
312
std
::
string
tr
(
reason
,0,
std
::
min
<
size_t
>(
reason
.
size
(),
313
frame
::
limits
::
close_reason_size
));
314
315
scoped_lock_type
lock
(
m_connection_state_lock
);
316
317
if
(
m_state
!=
session
::
state
::
open
) {
318
ec
=
error
::
make_error_code
(
error
::
invalid_state
);
319
return
;
320
}
321
322
ec
=
this
->
send_close_frame
(
code
,
tr
,
false
,
close
::
status
::
terminal
(
code
));
323
}
324
325
template
<
typename
config
>
326
void
connection
<
config
>::
close
(
close
::
status
::
value
const
code
,
327
std
::
string
const
&
reason
)
328
{
329
lib
::
error_code
ec
;
330
close
(
code
,
reason
,
ec
);
331
if
(
ec
) {
332
throw
exception
(
ec
);
333
}
334
}
335
336
/// Trigger the on_interrupt handler
337
/**
338
* This is thread safe if the transport is thread safe
339
*/
340
template
<
typename
config
>
341
lib
::
error_code
connection
<
config
>::
interrupt
() {
342
m_alog
.
write
(
log
::
alevel
::
devel
,
"connection connection::interrupt"
);
343
return
transport_con_type
::
interrupt
(
344
lib
::
bind
(
345
&
type
::
handle_interrupt
,
346
type
::
get_shared
()
347
)
348
);
349
}
350
351
352
template
<
typename
config
>
353
void
connection
<
config
>::
handle_interrupt
() {
354
if
(
m_interrupt_handler
) {
355
m_interrupt_handler
(
m_connection_hdl
);
356
}
357
}
358
359
template
<
typename
config
>
360
lib
::
error_code
connection
<
config
>::
pause_reading
() {
361
m_alog
.
write
(
log
::
alevel
::
devel
,
"connection connection::pause_reading"
);
362
return
transport_con_type
::
dispatch
(
363
lib
::
bind
(
364
&
type
::
handle_pause_reading
,
365
type
::
get_shared
()
366
)
367
);
368
}
369
370
/// Pause reading handler. Not safe to call directly
371
template
<
typename
config
>
372
void
connection
<
config
>::
handle_pause_reading
() {
373
m_alog
.
write
(
log
::
alevel
::
devel
,
"connection connection::handle_pause_reading"
);
374
m_read_flag
=
false
;
375
}
376
377
template
<
typename
config
>
378
lib
::
error_code
connection
<
config
>::
resume_reading
() {
379
m_alog
.
write
(
log
::
alevel
::
devel
,
"connection connection::resume_reading"
);
380
return
transport_con_type
::
dispatch
(
381
lib
::
bind
(
382
&
type
::
handle_resume_reading
,
383
type
::
get_shared
()
384
)
385
);
386
}
387
388
/// Resume reading helper method. Not safe to call directly
389
template
<
typename
config
>
390
void
connection
<
config
>::
handle_resume_reading
() {
391
m_read_flag
=
true
;
392
read_frame
();
393
}
394
395
396
397
398
399
400
401
402
403
404
405
template
<
typename
config
>
406
bool
connection
<
config
>::
get_secure
()
const
{
407
//scoped_lock_type lock(m_connection_state_lock);
408
return
m_uri
->
get_secure
();
409
}
410
411
template
<
typename
config
>
412
std
::
string
const
&
connection
<
config
>::
get_host
()
const
{
413
//scoped_lock_type lock(m_connection_state_lock);
414
return
m_uri
->
get_host
();
415
}
416
417
template
<
typename
config
>
418
std
::
string
const
&
connection
<
config
>::
get_resource
()
const
{
419
//scoped_lock_type lock(m_connection_state_lock);
420
return
m_uri
->
get_resource
();
421
}
422
423
template
<
typename
config
>
424
uint16_t
connection
<
config
>::
get_port
()
const
{
425
//scoped_lock_type lock(m_connection_state_lock);
426
return
m_uri
->
get_port
();
427
}
428
429
template
<
typename
config
>
430
uri_ptr
connection
<
config
>::
get_uri
()
const
{
431
//scoped_lock_type lock(m_connection_state_lock);
432
return
m_uri
;
433
}
434
435
template
<
typename
config
>
436
void
connection
<
config
>::
set_uri
(
uri_ptr
uri
) {
437
//scoped_lock_type lock(m_connection_state_lock);
438
m_uri
=
uri
;
439
}
440
441
442
443
444
445
446
template
<
typename
config
>
447
std
::
string
const
&
connection
<
config
>::
get_subprotocol
()
const
{
448
return
m_subprotocol
;
449
}
450
451
template
<
typename
config
>
452
std
::
vector
<
std
::
string
>
const
&
453
connection
<
config
>::
get_requested_subprotocols
()
const
{
454
return
m_requested_subprotocols
;
455
}
456
457
template
<
typename
config
>
458
void
connection
<
config
>::
add_subprotocol
(
std
::
string
const
&
value
,
459
lib
::
error_code
&
ec
)
460
{
461
if
(
m_is_server
) {
462
ec
=
error
::
make_error_code
(
error
::
client_only
);
463
return
;
464
}
465
466
// If the value is empty or has a non-RFC2616 token character it is invalid.
467
if
(
value
.
empty
() ||
std
::
find_if
(
value
.
begin
(),
value
.
end
(),
468
http
::
is_not_token_char
) !=
value
.
end
())
469
{
470
ec
=
error
::
make_error_code
(
error
::
invalid_subprotocol
);
471
return
;
472
}
473
474
m_requested_subprotocols
.
push_back
(
value
);
475
}
476
477
template
<
typename
config
>
478
void
connection
<
config
>::
add_subprotocol
(
std
::
string
const
&
value
) {
479
lib
::
error_code
ec
;
480
this
->
add_subprotocol
(
value
,
ec
);
481
if
(
ec
) {
482
throw
exception
(
ec
);
483
}
484
}
485
486
487
template
<
typename
config
>
488
void
connection
<
config
>::
select_subprotocol
(
std
::
string
const
&
value
,
489
lib
::
error_code
&
ec
)
490
{
491
if
(!
m_is_server
) {
492
ec
=
error
::
make_error_code
(
error
::
server_only
);
493
return
;
494
}
495
496
if
(
value
.
empty
()) {
497
ec
=
lib
::
error_code
();
498
return
;
499
}
500
501
std
::
vector
<
std
::
string
>::
iterator
it
;
502
503
it
=
std
::
find
(
m_requested_subprotocols
.
begin
(),
504
m_requested_subprotocols
.
end
(),
505
value
);
506
507
if
(
it
==
m_requested_subprotocols
.
end
()) {
508
ec
=
error
::
make_error_code
(
error
::
unrequested_subprotocol
);
509
return
;
510
}
511
512
m_subprotocol
=
value
;
513
}
514
515
template
<
typename
config
>
516
void
connection
<
config
>::
select_subprotocol
(
std
::
string
const
&
value
) {
517
lib
::
error_code
ec
;
518
this
->
select_subprotocol
(
value
,
ec
);
519
if
(
ec
) {
520
throw
exception
(
ec
);
521
}
522
}
523
524
525
template
<
typename
config
>
526
std
::
string
const
&
527
connection
<
config
>::
get_request_header
(
std
::
string
const
&
key
)
const
{
528
return
m_request
.
get_header
(
key
);
529
}
530
531
template
<
typename
config
>
532
std
::
string
const
&
533
connection
<
config
>::
get_request_body
()
const
{
534
return
m_request
.
get_body
();
535
}
536
537
template
<
typename
config
>
538
std
::
string
const
&
539
connection
<
config
>::
get_response_header
(
std
::
string
const
&
key
)
const
{
540
return
m_response
.
get_header
(
key
);
541
}
542
543
// TODO: EXCEPTION_FREE
544
template
<
typename
config
>
545
void
connection
<
config
>::
set_status
(
http
::
status_code
::
value
code
)
546
{
547
if
(
m_internal_state
!=
istate
::
PROCESS_HTTP_REQUEST
) {
548
throw
exception
(
"Call to set_status from invalid state"
,
549
error
::
make_error_code
(
error
::
invalid_state
));
550
}
551
m_response
.
set_status
(
code
);
552
}
553
554
// TODO: EXCEPTION_FREE
555
template
<
typename
config
>
556
void
connection
<
config
>::
set_status
(
http
::
status_code
::
value
code
,
557
std
::
string
const
&
msg
)
558
{
559
if
(
m_internal_state
!=
istate
::
PROCESS_HTTP_REQUEST
) {
560
throw
exception
(
"Call to set_status from invalid state"
,
561
error
::
make_error_code
(
error
::
invalid_state
));
562
}
563
564
m_response
.
set_status
(
code
,
msg
);
565
}
566
567
// TODO: EXCEPTION_FREE
568
template
<
typename
config
>
569
void
connection
<
config
>::
set_body
(
std
::
string
const
&
value
) {
570
if
(
m_internal_state
!=
istate
::
PROCESS_HTTP_REQUEST
) {
571
throw
exception
(
"Call to set_status from invalid state"
,
572
error
::
make_error_code
(
error
::
invalid_state
));
573
}
574
575
m_response
.
set_body
(
value
);
576
}
577
578
// TODO: EXCEPTION_FREE
579
template
<
typename
config
>
580
void
connection
<
config
>::
append_header
(
std
::
string
const
&
key
,
581
std
::
string
const
&
val
)
582
{
583
if
(
m_is_server
) {
584
if
(
m_internal_state
==
istate
::
PROCESS_HTTP_REQUEST
) {
585
// we are setting response headers for an incoming server connection
586
m_response
.
append_header
(
key
,
val
);
587
}
else
{
588
throw
exception
(
"Call to append_header from invalid state"
,
589
error
::
make_error_code
(
error
::
invalid_state
));
590
}
591
}
else
{
592
if
(
m_internal_state
==
istate
::
USER_INIT
) {
593
// we are setting initial headers for an outgoing client connection
594
m_request
.
append_header
(
key
,
val
);
595
}
else
{
596
throw
exception
(
"Call to append_header from invalid state"
,
597
error
::
make_error_code
(
error
::
invalid_state
));
598
}
599
}
600
}
601
602
// TODO: EXCEPTION_FREE
603
template
<
typename
config
>
604
void
connection
<
config
>::
replace_header
(
std
::
string
const
&
key
,
605
std
::
string
const
&
val
)
606
{
607
if
(
m_is_server
) {
608
if
(
m_internal_state
==
istate
::
PROCESS_HTTP_REQUEST
) {
609
// we are setting response headers for an incoming server connection
610
m_response
.
replace_header
(
key
,
val
);
611
}
else
{
612
throw
exception
(
"Call to replace_header from invalid state"
,
613
error
::
make_error_code
(
error
::
invalid_state
));
614
}
615
}
else
{
616
if
(
m_internal_state
==
istate
::
USER_INIT
) {
617
// we are setting initial headers for an outgoing client connection
618
m_request
.
replace_header
(
key
,
val
);
619
}
else
{
620
throw
exception
(
"Call to replace_header from invalid state"
,
621
error
::
make_error_code
(
error
::
invalid_state
));
622
}
623
}
624
}
625
626
// TODO: EXCEPTION_FREE
627
template
<
typename
config
>
628
void
connection
<
config
>::
remove_header
(
std
::
string
const
&
key
)
629
{
630
if
(
m_is_server
) {
631
if
(
m_internal_state
==
istate
::
PROCESS_HTTP_REQUEST
) {
632
// we are setting response headers for an incoming server connection
633
m_response
.
remove_header
(
key
);
634
}
else
{
635
throw
exception
(
"Call to remove_header from invalid state"
,
636
error
::
make_error_code
(
error
::
invalid_state
));
637
}
638
}
else
{
639
if
(
m_internal_state
==
istate
::
USER_INIT
) {
640
// we are setting initial headers for an outgoing client connection
641
m_request
.
remove_header
(
key
);
642
}
else
{
643
throw
exception
(
"Call to remove_header from invalid state"
,
644
error
::
make_error_code
(
error
::
invalid_state
));
645
}
646
}
647
}
648
649
/// Defer HTTP Response until later
650
/**
651
* Used in the http handler to defer the HTTP response for this connection
652
* until later. Handshake timers will be canceled and the connection will be
653
* left open until `send_http_response` or an equivalent is called.
654
*
655
* Warning: deferred connections won't time out and as a result can tie up
656
* resources.
657
*
658
* @return A status code, zero on success, non-zero otherwise
659
*/
660
template
<
typename
config
>
661
lib
::
error_code
connection
<
config
>::
defer_http_response
() {
662
// Cancel handshake timer, otherwise the connection will time out and we'll
663
// close the connection before the app has a chance to send a response.
664
if
(
m_handshake_timer
) {
665
m_handshake_timer
->
cancel
();
666
m_handshake_timer
.
reset
();
667
}
668
669
// Do something to signal deferral
670
m_http_state
=
session
::
http_state
::
deferred
;
671
672
return
lib
::
error_code
();
673
}
674
675
/// Send deferred HTTP Response (exception free)
676
/**
677
* Sends an http response to an HTTP connection that was deferred. This will
678
* send a complete response including all headers, status line, and body
679
* text. The connection will be closed afterwards.
680
*
681
* @since 0.6.0
682
*
683
* @param ec A status code, zero on success, non-zero otherwise
684
*/
685
template
<
typename
config
>
686
void
connection
<
config
>::
send_http_response
(
lib
::
error_code
&
ec
) {
687
{
688
scoped_lock_type
lock
(
m_connection_state_lock
);
689
if
(
m_http_state
!=
session
::
http_state
::
deferred
) {
690
ec
=
error
::
make_error_code
(
error
::
invalid_state
);
691
return
;
692
}
693
694
m_http_state
=
session
::
http_state
::
body_written
;
695
}
696
697
this
->
write_http_response
(
lib
::
error_code
());
698
ec
=
lib
::
error_code
();
699
}
700
701
template
<
typename
config
>
702
void
connection
<
config
>::
send_http_response
() {
703
lib
::
error_code
ec
;
704
this
->
send_http_response
(
ec
);
705
if
(
ec
) {
706
throw
exception
(
ec
);
707
}
708
}
709
710
711
712
713
/******** logic thread ********/
714
715
template
<
typename
config
>
716
void
connection
<
config
>::
start
() {
717
m_alog
.
write
(
log
::
alevel
::
devel
,
"connection start"
);
718
719
if
(
m_internal_state
!=
istate
::
USER_INIT
) {
720
m_alog
.
write
(
log
::
alevel
::
devel
,
"Start called in invalid state"
);
721
this
->
terminate
(
error
::
make_error_code
(
error
::
invalid_state
));
722
return
;
723
}
724
725
m_internal_state
=
istate
::
TRANSPORT_INIT
;
726
727
// Depending on how the transport implements init this function may return
728
// immediately and call handle_transport_init later or call
729
// handle_transport_init from this function.
730
transport_con_type
::
init
(
731
lib
::
bind
(
732
&
type
::
handle_transport_init
,
733
type
::
get_shared
(),
734
lib
::
placeholders
::
_1
735
)
736
);
737
}
738
739
template
<
typename
config
>
740
void
connection
<
config
>::
handle_transport_init
(
lib
::
error_code
const
&
ec
) {
741
m_alog
.
write
(
log
::
alevel
::
devel
,
"connection handle_transport_init"
);
742
743
lib
::
error_code
ecm
=
ec
;
744
745
if
(
m_internal_state
!=
istate
::
TRANSPORT_INIT
) {
746
m_alog
.
write
(
log
::
alevel
::
devel
,
747
"handle_transport_init must be called from transport init state"
);
748
ecm
=
error
::
make_error_code
(
error
::
invalid_state
);
749
}
750
751
if
(
ecm
) {
752
std
::
stringstream
s
;
753
s
<<
"handle_transport_init received error: "
<<
ecm
.
message
();
754
m_elog
.
write
(
log
::
elevel
::
rerror
,
s
.
str
());
755
756
this
->
terminate
(
ecm
);
757
return
;
758
}
759
760
// At this point the transport is ready to read and write bytes.
761
if
(
m_is_server
) {
762
m_internal_state
=
istate
::
READ_HTTP_REQUEST
;
763
this
->
read_handshake
(1);
764
}
else
{
765
// We are a client. Set the processor to the version specified in the
766
// config file and send a handshake request.
767
m_internal_state
=
istate
::
WRITE_HTTP_REQUEST
;
768
m_processor
=
get_processor
(
config
::
client_version
);
769
this
->
send_http_request
();
770
}
771
}
772
773
template
<
typename
config
>
774
void
connection
<
config
>::
read_handshake
(
size_t
num_bytes
) {
775
m_alog
.
write
(
log
::
alevel
::
devel
,
"connection read_handshake"
);
776
777
if
(
m_open_handshake_timeout_dur
> 0) {
778
m_handshake_timer
=
transport_con_type
::
set_timer
(
779
m_open_handshake_timeout_dur
,
780
lib
::
bind
(
781
&
type
::
handle_open_handshake_timeout
,
782
type
::
get_shared
(),
783
lib
::
placeholders
::
_1
784
)
785
);
786
}
787
788
transport_con_type
::
async_read_at_least
(
789
num_bytes
,
790
m_buf
,
791
config
::
connection_read_buffer_size
,
792
lib
::
bind
(
793
&
type
::
handle_read_handshake
,
794
type
::
get_shared
(),
795
lib
::
placeholders
::
_1
,
796
lib
::
placeholders
::
_2
797
)
798
);
799
}
800
801
// All exit paths for this function need to call write_http_response() or submit
802
// a new read request with this function as the handler.
803
template
<
typename
config
>
804
void
connection
<
config
>::
handle_read_handshake
(
lib
::
error_code
const
&
ec
,
805
size_t
bytes_transferred
)
806
{
807
m_alog
.
write
(
log
::
alevel
::
devel
,
"connection handle_read_handshake"
);
808
809
lib
::
error_code
ecm
=
ec
;
810
811
if
(!
ecm
) {
812
scoped_lock_type
lock
(
m_connection_state_lock
);
813
814
if
(
m_state
==
session
::
state
::
connecting
) {
815
if
(
m_internal_state
!=
istate
::
READ_HTTP_REQUEST
) {
816
ecm
=
error
::
make_error_code
(
error
::
invalid_state
);
817
}
818
}
else
if
(
m_state
==
session
::
state
::
closed
) {
819
// The connection was canceled while the response was being sent,
820
// usually by the handshake timer. This is basically expected
821
// (though hopefully rare) and there is nothing we can do so ignore.
822
m_alog
.
write
(
log
::
alevel
::
devel
,
823
"handle_read_handshake invoked after connection was closed"
);
824
return
;
825
}
else
{
826
ecm
=
error
::
make_error_code
(
error
::
invalid_state
);
827
}
828
}
829
830
if
(
ecm
) {
831
if
(
ecm
==
transport
::
error
::
eof
&&
m_state
==
session
::
state
::
closed
) {
832
// we expect to get eof if the connection is closed already
833
m_alog
.
write
(
log
::
alevel
::
devel
,
834
"got (expected) eof/state error from closed con"
);
835
return
;
836
}
837
838
log_err
(
log
::
elevel
::
rerror
,
"handle_read_handshake"
,
ecm
);
839
this
->
terminate
(
ecm
);
840
return
;
841
}
842
843
// Boundaries checking. TODO: How much of this should be done?
844
if
(
bytes_transferred
>
config
::
connection_read_buffer_size
) {
845
m_elog
.
write
(
log
::
elevel
::
fatal
,
"Fatal boundaries checking error."
);
846
this
->
terminate
(
make_error_code
(
error
::
general
));
847
return
;
848
}
849
850
size_t
bytes_processed
= 0;
851
try
{
852
bytes_processed
=
m_request
.
consume
(
m_buf
,
bytes_transferred
);
853
}
catch
(
http
::
exception
&
e
) {
854
// All HTTP exceptions will result in this request failing and an error
855
// response being returned. No more bytes will be read in this con.
856
m_response
.
set_status
(
e
.
m_error_code
,
e
.
m_error_msg
);
857
this
->
write_http_response_error
(
error
::
make_error_code
(
error
::
http_parse_error
));
858
return
;
859
}
860
861
// More paranoid boundaries checking.
862
// TODO: Is this overkill?
863
if
(
bytes_processed
>
bytes_transferred
) {
864
m_elog
.
write
(
log
::
elevel
::
fatal
,
"Fatal boundaries checking error."
);
865
this
->
terminate
(
make_error_code
(
error
::
general
));
866
return
;
867
}
868
869
if
(
m_alog
.
static_test
(
log
::
alevel
::
devel
)) {
870
std
::
stringstream
s
;
871
s
<<
"bytes_transferred: "
<<
bytes_transferred
872
<<
" bytes, bytes processed: "
<<
bytes_processed
<<
" bytes"
;
873
m_alog
.
write
(
log
::
alevel
::
devel
,
s
.
str
());
874
}
875
876
if
(
m_request
.
ready
()) {
877
lib
::
error_code
processor_ec
=
this
->
initialize_processor
();
878
if
(
processor_ec
) {
879
this
->
write_http_response_error
(
processor_ec
);
880
return
;
881
}
882
883
if
(
m_processor
&&
m_processor
->
get_version
() == 0) {
884
// Version 00 has an extra requirement to read some bytes after the
885
// handshake
886
if
(
bytes_transferred
-
bytes_processed
>= 8) {
887
m_request
.
replace_header
(
888
"Sec-WebSocket-Key3"
,
889
std
::
string
(
m_buf
+
bytes_processed
,
m_buf
+
bytes_processed
+8)
890
);
891
bytes_processed
+= 8;
892
}
else
{
893
// TODO: need more bytes
894
m_alog
.
write
(
log
::
alevel
::
devel
,
"short key3 read"
);
895
m_response
.
set_status
(
http
::
status_code
::
internal_server_error
);
896
this
->
write_http_response_error
(
processor
::
error
::
make_error_code
(
processor
::
error
::
short_key3
));
897
return
;
898
}
899
}
900
901
if
(
m_alog
.
static_test
(
log
::
alevel
::
devel
)) {
902
m_alog
.
write
(
log
::
alevel
::
devel
,
m_request
.
raw
());
903
if
(!
m_request
.
get_header
(
"Sec-WebSocket-Key3"
).
empty
()) {
904
m_alog
.
write
(
log
::
alevel
::
devel
,
905
utility
::
to_hex
(
m_request
.
get_header
(
"Sec-WebSocket-Key3"
)));
906
}
907
}
908
909
// The remaining bytes in m_buf are frame data. Copy them to the
910
// beginning of the buffer and note the length. They will be read after
911
// the handshake completes and before more bytes are read.
912
std
::
copy
(
m_buf
+
bytes_processed
,
m_buf
+
bytes_transferred
,
m_buf
);
913
m_buf_cursor
=
bytes_transferred
-
bytes_processed
;
914
915
916
m_internal_state
=
istate
::
PROCESS_HTTP_REQUEST
;
917
918
// We have the complete request. Process it.
919
lib
::
error_code
handshake_ec
=
this
->
process_handshake_request
();
920
921
// Write a response if this is a websocket connection or if it is an
922
// HTTP connection for which the response has not been deferred or
923
// started yet by a different system (i.e. still in init state).
924
if
(!
m_is_http
||
m_http_state
==
session
::
http_state
::
init
) {
925
this
->
write_http_response
(
handshake_ec
);
926
}
927
}
else
{
928
// read at least 1 more byte
929
transport_con_type
::
async_read_at_least
(
930
1,
931
m_buf
,
932
config
::
connection_read_buffer_size
,
933
lib
::
bind
(
934
&
type
::
handle_read_handshake
,
935
type
::
get_shared
(),
936
lib
::
placeholders
::
_1
,
937
lib
::
placeholders
::
_2
938
)
939
);
940
}
941
}
942
943
// write_http_response requires the request to be fully read and the connection
944
// to be in the PROCESS_HTTP_REQUEST state. In some cases we can detect errors
945
// before the request is fully read (specifically at a point where we aren't
946
// sure if the hybi00 key3 bytes need to be read). This method sets the correct
947
// state and calls write_http_response
948
template
<
typename
config
>
949
void
connection
<
config
>::
write_http_response_error
(
lib
::
error_code
const
&
ec
) {
950
if
(
m_internal_state
!=
istate
::
READ_HTTP_REQUEST
) {
951
m_alog
.
write
(
log
::
alevel
::
devel
,
952
"write_http_response_error called in invalid state"
);
953
this
->
terminate
(
error
::
make_error_code
(
error
::
invalid_state
));
954
return
;
955
}
956
957
m_internal_state
=
istate
::
PROCESS_HTTP_REQUEST
;
958
959
this
->
write_http_response
(
ec
);
960
}
961
962
// All exit paths for this function need to call write_http_response() or submit
963
// a new read request with this function as the handler.
964
template
<
typename
config
>
965
void
connection
<
config
>::
handle_read_frame
(
lib
::
error_code
const
&
ec
,
966
size_t
bytes_transferred
)
967
{
968
//m_alog.write(log::alevel::devel,"connection handle_read_frame");
969
970
lib
::
error_code
ecm
=
ec
;
971
972
if
(!
ecm
&&
m_internal_state
!=
istate
::
PROCESS_CONNECTION
) {
973
ecm
=
error
::
make_error_code
(
error
::
invalid_state
);
974
}
975
976
if
(
ecm
) {
977
log
::
level
echannel
=
log
::
elevel
::
rerror
;
978
979
if
(
ecm
==
transport
::
error
::
eof
) {
980
if
(
m_state
==
session
::
state
::
closed
) {
981
// we expect to get eof if the connection is closed already
982
// just ignore it
983
m_alog
.
write
(
log
::
alevel
::
devel
,
"got eof from closed con"
);
984
return
;
985
}
else
if
(
m_state
==
session
::
state
::
closing
&& !
m_is_server
) {
986
// If we are a client we expect to get eof in the closing state,
987
// this is a signal to terminate our end of the connection after
988
// the closing handshake
989
terminate
(
lib
::
error_code
());
990
return
;
991
}
992
}
else
if
(
ecm
==
error
::
invalid_state
) {
993
// In general, invalid state errors in the closed state are the
994
// result of handlers that were in the system already when the state
995
// changed and should be ignored as they pose no problems and there
996
// is nothing useful that we can do about them.
997
if
(
m_state
==
session
::
state
::
closed
) {
998
m_alog
.
write
(
log
::
alevel
::
devel
,
999
"handle_read_frame: got invalid istate in closed state"
);
1000
return
;
1001
}
1002
}
else
if
(
ecm
==
transport
::
error
::
tls_short_read
) {
1003
if
(
m_state
==
session
::
state
::
closed
) {
1004
// We expect to get a TLS short read if we try to read after the
1005
// connection is closed. If this happens ignore and exit the
1006
// read frame path.
1007
terminate
(
lib
::
error_code
());
1008
return
;
1009
}
1010
echannel
=
log
::
elevel
::
rerror
;
1011
}
else
if
(
ecm
==
transport
::
error
::
action_after_shutdown
) {
1012
echannel
=
log
::
elevel
::
info
;
1013
}
1014
1015
log_err
(
echannel
,
"handle_read_frame"
,
ecm
);
1016
this
->
terminate
(
ecm
);
1017
return
;
1018
}
1019
1020
// Boundaries checking. TODO: How much of this should be done?
1021
/*if (bytes_transferred > config::connection_read_buffer_size) {
1022
m_elog.write(log::elevel::fatal,"Fatal boundaries checking error");
1023
this->terminate(make_error_code(error::general));
1024
return;
1025
}*/
1026
1027
size_t
p
= 0;
1028
1029
if
(
m_alog
.
static_test
(
log
::
alevel
::
devel
)) {
1030
std
::
stringstream
s
;
1031
s
<<
"p = "
<<
p
<<
" bytes transferred = "
<<
bytes_transferred
;
1032
m_alog
.
write
(
log
::
alevel
::
devel
,
s
.
str
());
1033
}
1034
1035
while
(
p
<
bytes_transferred
) {
1036
if
(
m_alog
.
static_test
(
log
::
alevel
::
devel
)) {
1037
std
::
stringstream
s
;
1038
s
<<
"calling consume with "
<<
bytes_transferred
-
p
<<
" bytes"
;
1039
m_alog
.
write
(
log
::
alevel
::
devel
,
s
.
str
());
1040
}
1041
1042
lib
::
error_code
consume_ec
;
1043
1044
if
(
m_alog
.
static_test
(
log
::
alevel
::
devel
)) {
1045
std
::
stringstream
s
;
1046
s
<<
"Processing Bytes: "
<<
utility
::
to_hex
(
reinterpret_cast
<
uint8_t
*>(
m_buf
)+
p
,
bytes_transferred
-
p
);
1047
m_alog
.
write
(
log
::
alevel
::
devel
,
s
.
str
());
1048
}
1049
1050
p
+=
m_processor
->
consume
(
1051
reinterpret_cast
<
uint8_t
*>(
m_buf
)+
p
,
1052
bytes_transferred
-
p
,
1053
consume_ec
1054
);
1055
1056
if
(
m_alog
.
static_test
(
log
::
alevel
::
devel
)) {
1057
std
::
stringstream
s
;
1058
s
<<
"bytes left after consume: "
<<
bytes_transferred
-
p
;
1059
m_alog
.
write
(
log
::
alevel
::
devel
,
s
.
str
());
1060
}
1061
if
(
consume_ec
) {
1062
log_err
(
log
::
elevel
::
rerror
,
"consume"
,
consume_ec
);
1063
1064
if
(
config
::
drop_on_protocol_error
) {
1065
this
->
terminate
(
consume_ec
);
1066
return
;
1067
}
else
{
1068
lib
::
error_code
close_ec
;
1069
this
->
close
(
1070
processor
::
error
::
to_ws
(
consume_ec
),
1071
consume_ec
.
message
(),
1072
close_ec
1073
);
1074
1075
if
(
close_ec
) {
1076
log_err
(
log
::
elevel
::
fatal
,
"Protocol error close frame "
,
close_ec
);
1077
this
->
terminate
(
close_ec
);
1078
return
;
1079
}
1080
}
1081
return
;
1082
}
1083
1084
if
(
m_processor
->
ready
()) {
1085
if
(
m_alog
.
static_test
(
log
::
alevel
::
devel
)) {
1086
std
::
stringstream
s
;
1087
s
<<
"Complete message received. Dispatching"
;
1088
m_alog
.
write
(
log
::
alevel
::
devel
,
s
.
str
());
1089
}
1090
1091
message_ptr
msg
=
m_processor
->
get_message
();
1092
1093
if
(!
msg
) {
1094
m_alog
.
write
(
log
::
alevel
::
devel
,
"null message from m_processor"
);
1095
}
else
if
(!
is_control
(
msg
->
get_opcode
())) {
1096
// data message, dispatch to user
1097
if
(
m_state
!=
session
::
state
::
open
) {
1098
m_elog
.
write
(
log
::
elevel
::
warn
,
"got non-close frame while closing"
);
1099
}
else
if
(
m_message_handler
) {
1100
m_message_handler
(
m_connection_hdl
,
msg
);
1101
}
1102
}
else
{
1103
process_control_frame
(
msg
);
1104
}
1105
}
1106
}
1107
1108
read_frame
();
1109
}
1110
1111
/// Issue a new transport read unless reading is paused.
1112
template
<
typename
config
>
1113
void
connection
<
config
>::
read_frame
() {
1114
if
(!
m_read_flag
) {
1115
return
;
1116
}
1117
1118
transport_con_type
::
async_read_at_least
(
1119
// std::min wont work with undefined static const values.
1120
// TODO: is there a more elegant way to do this?
1121
// Need to determine if requesting 1 byte or the exact number of bytes
1122
// is better here. 1 byte lets us be a bit more responsive at a
1123
// potential expense of additional runs through handle_read_frame
1124
/*(m_processor->get_bytes_needed() > config::connection_read_buffer_size ?
1125
config::connection_read_buffer_size : m_processor->get_bytes_needed())*/
1126
1,
1127
m_buf
,
1128
config
::
connection_read_buffer_size
,
1129
m_handle_read_frame
1130
);
1131
}
1132
1133
template
<
typename
config
>
1134
lib
::
error_code
connection
<
config
>::
initialize_processor
() {
1135
m_alog
.
write
(
log
::
alevel
::
devel
,
"initialize_processor"
);
1136
1137
// if it isn't a websocket handshake nothing to do.
1138
if
(!
processor
::
is_websocket_handshake
(
m_request
)) {
1139
return
lib
::
error_code
();
1140
}
1141
1142
int
version
=
processor
::
get_websocket_version
(
m_request
);
1143
1144
if
(
version
< 0) {
1145
m_alog
.
write
(
log
::
alevel
::
devel
,
"BAD REQUEST: can't determine version"
);
1146
m_response
.
set_status
(
http
::
status_code
::
bad_request
);
1147
return
error
::
make_error_code
(
error
::
invalid_version
);
1148
}
1149
1150
m_processor
=
get_processor
(
version
);
1151
1152
// if the processor is not null we are done
1153
if
(
m_processor
) {
1154
return
lib
::
error_code
();
1155
}
1156
1157
// We don't have a processor for this version. Return bad request
1158
// with Sec-WebSocket-Version header filled with values we do accept
1159
m_alog
.
write
(
log
::
alevel
::
devel
,
"BAD REQUEST: no processor for version"
);
1160
m_response
.
set_status
(
http
::
status_code
::
bad_request
);
1161
1162
std
::
stringstream
ss
;
1163
std
::
string
sep
;
1164
std
::
vector
<
int
>::
const_iterator
it
;
1165
for
(
it
=
versions_supported
.
begin
();
it
!=
versions_supported
.
end
();
it
++)
1166
{
1167
ss
<<
sep
<< *
it
;
1168
sep
=
","
;
1169
}
1170
1171
m_response
.
replace_header
(
"Sec-WebSocket-Version"
,
ss
.
str
());
1172
return
error
::
make_error_code
(
error
::
unsupported_version
);
1173
}
1174
1175
template
<
typename
config
>
1176
lib
::
error_code
connection
<
config
>::
process_handshake_request
() {
1177
m_alog
.
write
(
log
::
alevel
::
devel
,
"process handshake request"
);
1178
1179
if
(!
processor
::
is_websocket_handshake
(
m_request
)) {
1180
// this is not a websocket handshake. Process as plain HTTP
1181
m_alog
.
write
(
log
::
alevel
::
devel
,
"HTTP REQUEST"
);
1182
1183
// extract URI from request
1184
m_uri
=
processor
::
get_uri_from_host
(
1185
m_request
,
1186
(
transport_con_type
::
is_secure
() ?
"https"
:
"http"
)
1187
);
1188
1189
if
(!
m_uri
->
get_valid
()) {
1190
m_alog
.
write
(
log
::
alevel
::
devel
,
"Bad request: failed to parse uri"
);
1191
m_response
.
set_status
(
http
::
status_code
::
bad_request
);
1192
return
error
::
make_error_code
(
error
::
invalid_uri
);
1193
}
1194
1195
if
(
m_http_handler
) {
1196
m_is_http
=
true
;
1197
m_http_handler
(
m_connection_hdl
);
1198
1199
if
(
m_state
==
session
::
state
::
closed
) {
1200
return
error
::
make_error_code
(
error
::
http_connection_ended
);
1201
}
1202
}
else
{
1203
set_status
(
http
::
status_code
::
upgrade_required
);
1204
return
error
::
make_error_code
(
error
::
upgrade_required
);
1205
}
1206
1207
return
lib
::
error_code
();
1208
}
1209
1210
lib
::
error_code
ec
=
m_processor
->
validate_handshake
(
m_request
);
1211
1212
// Validate: make sure all required elements are present.
1213
if
(
ec
){
1214
// Not a valid handshake request
1215
m_alog
.
write
(
log
::
alevel
::
devel
,
"Bad request "
+
ec
.
message
());
1216
m_response
.
set_status
(
http
::
status_code
::
bad_request
);
1217
return
ec
;
1218
}
1219
1220
// Read extension parameters and set up values necessary for the end user
1221
// to complete extension negotiation.
1222
std
::
pair
<
lib
::
error_code
,
std
::
string
>
neg_results
;
1223
neg_results
=
m_processor
->
negotiate_extensions
(
m_request
);
1224
1225
if
(
neg_results
.
first
) {
1226
// There was a fatal error in extension parsing that should result in
1227
// a failed connection attempt.
1228
m_alog
.
write
(
log
::
alevel
::
devel
,
"Bad request: "
+
neg_results
.
first
.
message
());
1229
m_response
.
set_status
(
http
::
status_code
::
bad_request
);
1230
return
neg_results
.
first
;
1231
}
else
{
1232
// extension negotiation succeeded, set response header accordingly
1233
// we don't send an empty extensions header because it breaks many
1234
// clients.
1235
if
(
neg_results
.
second
.
size
() > 0) {
1236
m_response
.
replace_header
(
"Sec-WebSocket-Extensions"
,
1237
neg_results
.
second
);
1238
}
1239
}
1240
1241
// extract URI from request
1242
m_uri
=
m_processor
->
get_uri
(
m_request
);
1243
1244
1245
if
(!
m_uri
->
get_valid
()) {
1246
m_alog
.
write
(
log
::
alevel
::
devel
,
"Bad request: failed to parse uri"
);
1247
m_response
.
set_status
(
http
::
status_code
::
bad_request
);
1248
return
error
::
make_error_code
(
error
::
invalid_uri
);
1249
}
1250
1251
// extract subprotocols
1252
lib
::
error_code
subp_ec
=
m_processor
->
extract_subprotocols
(
m_request
,
1253
m_requested_subprotocols
);
1254
1255
if
(
subp_ec
) {
1256
// should we do anything?
1257
}
1258
1259
// Ask application to validate the connection
1260
if
(!
m_validate_handler
||
m_validate_handler
(
m_connection_hdl
)) {
1261
m_response
.
set_status
(
http
::
status_code
::
switching_protocols
);
1262
1263
// Write the appropriate response headers based on request and
1264
// processor version
1265
ec
=
m_processor
->
process_handshake
(
m_request
,
m_subprotocol
,
m_response
);
1266
1267
if
(
ec
) {
1268
std
::
stringstream
s
;
1269
s
<<
"Processing error: "
<<
ec
<<
"("
<<
ec
.
message
() <<
")"
;
1270
m_alog
.
write
(
log
::
alevel
::
devel
,
s
.
str
());
1271
1272
m_response
.
set_status
(
http
::
status_code
::
internal_server_error
);
1273
return
ec
;
1274
}
1275
}
else
{
1276
// User application has rejected the handshake
1277
m_alog
.
write
(
log
::
alevel
::
devel
,
"USER REJECT"
);
1278
1279
// Use Bad Request if the user handler did not provide a more
1280
// specific http response error code.
1281
// TODO: is there a better default?
1282
if
(
m_response
.
get_status_code
() ==
http
::
status_code
::
uninitialized
) {
1283
m_response
.
set_status
(
http
::
status_code
::
bad_request
);
1284
}
1285
1286
return
error
::
make_error_code
(
error
::
rejected
);
1287
}
1288
1289
return
lib
::
error_code
();
1290
}
1291
1292
template
<
typename
config
>
1293
void
connection
<
config
>::
write_http_response
(
lib
::
error_code
const
&
ec
) {
1294
m_alog
.
write
(
log
::
alevel
::
devel
,
"connection write_http_response"
);
1295
1296
if
(
ec
==
error
::
make_error_code
(
error
::
http_connection_ended
)) {
1297
m_alog
.
write
(
log
::
alevel
::
http
,
"An HTTP handler took over the connection."
);
1298
return
;
1299
}
1300
1301
if
(
m_response
.
get_status_code
() ==
http
::
status_code
::
uninitialized
) {
1302
m_response
.
set_status
(
http
::
status_code
::
internal_server_error
);
1303
m_ec
=
error
::
make_error_code
(
error
::
general
);
1304
}
else
{
1305
m_ec
=
ec
;
1306
}
1307
1308
m_response
.
set_version
(
"HTTP/1.1"
);
1309
1310
// Set server header based on the user agent settings
1311
if
(
m_response
.
get_header
(
"Server"
).
empty
()) {
1312
if
(!
m_user_agent
.
empty
()) {
1313
m_response
.
replace_header
(
"Server"
,
m_user_agent
);
1314
}
else
{
1315
m_response
.
remove_header
(
"Server"
);
1316
}
1317
}
1318
1319
// have the processor generate the raw bytes for the wire (if it exists)
1320
if
(
m_processor
) {
1321
m_handshake_buffer
=
m_processor
->
get_raw
(
m_response
);
1322
}
else
{
1323
// a processor wont exist for raw HTTP responses.
1324
m_handshake_buffer
=
m_response
.
raw
();
1325
}
1326
1327
if
(
m_alog
.
static_test
(
log
::
alevel
::
devel
)) {
1328
m_alog
.
write
(
log
::
alevel
::
devel
,
"Raw Handshake response:\n"
+
m_handshake_buffer
);
1329
if
(!
m_response
.
get_header
(
"Sec-WebSocket-Key3"
).
empty
()) {
1330
m_alog
.
write
(
log
::
alevel
::
devel
,
1331
utility
::
to_hex
(
m_response
.
get_header
(
"Sec-WebSocket-Key3"
)));
1332
}
1333
}
1334
1335
// write raw bytes
1336
transport_con_type
::
async_write
(
1337
m_handshake_buffer
.
data
(),
1338
m_handshake_buffer
.
size
(),
1339
lib
::
bind
(
1340
&
type
::
handle_write_http_response
,
1341
type
::
get_shared
(),
1342
lib
::
placeholders
::
_1
1343
)
1344
);
1345
}
1346
1347
template
<
typename
config
>
1348
void
connection
<
config
>::
handle_write_http_response
(
lib
::
error_code
const
&
ec
) {
1349
m_alog
.
write
(
log
::
alevel
::
devel
,
"handle_write_http_response"
);
1350
1351
lib
::
error_code
ecm
=
ec
;
1352
1353
if
(!
ecm
) {
1354
scoped_lock_type
lock
(
m_connection_state_lock
);
1355
1356
if
(
m_state
==
session
::
state
::
connecting
) {
1357
if
(
m_internal_state
!=
istate
::
PROCESS_HTTP_REQUEST
) {
1358
ecm
=
error
::
make_error_code
(
error
::
invalid_state
);
1359
}
1360
}
else
if
(
m_state
==
session
::
state
::
closed
) {
1361
// The connection was canceled while the response was being sent,
1362
// usually by the handshake timer. This is basically expected
1363
// (though hopefully rare) and there is nothing we can do so ignore.
1364
m_alog
.
write
(
log
::
alevel
::
devel
,
1365
"handle_write_http_response invoked after connection was closed"
);
1366
return
;
1367
}
else
{
1368
ecm
=
error
::
make_error_code
(
error
::
invalid_state
);
1369
}
1370
}
1371
1372
if
(
ecm
) {
1373
if
(
ecm
==
transport
::
error
::
eof
&&
m_state
==
session
::
state
::
closed
) {
1374
// we expect to get eof if the connection is closed already
1375
m_alog
.
write
(
log
::
alevel
::
devel
,
1376
"got (expected) eof/state error from closed con"
);
1377
return
;
1378
}
1379
1380
log_err
(
log
::
elevel
::
rerror
,
"handle_write_http_response"
,
ecm
);
1381
this
->
terminate
(
ecm
);
1382
return
;
1383
}
1384
1385
if
(
m_handshake_timer
) {
1386
m_handshake_timer
->
cancel
();
1387
m_handshake_timer
.
reset
();
1388
}
1389
1390
if
(
m_response
.
get_status_code
() !=
http
::
status_code
::
switching_protocols
)
1391
{
1392
/*if (m_processor || m_ec == error::http_parse_error ||
1393
m_ec == error::invalid_version || m_ec == error::unsupported_version
1394
|| m_ec == error::upgrade_required)
1395
{*/
1396
if
(!
m_is_http
) {
1397
std
::
stringstream
s
;
1398
s
<<
"Handshake ended with HTTP error: "
1399
<<
m_response
.
get_status_code
();
1400
m_elog
.
write
(
log
::
elevel
::
rerror
,
s
.
str
());
1401
}
else
{
1402
// if this was not a websocket connection, we have written
1403
// the expected response and the connection can be closed.
1404
1405
this
->
log_http_result
();
1406
1407
if
(
m_ec
) {
1408
m_alog
.
write
(
log
::
alevel
::
devel
,
1409
"got to writing HTTP results with m_ec set: "
+
m_ec
.
message
());
1410
}
1411
m_ec
=
make_error_code
(
error
::
http_connection_ended
);
1412
}
1413
1414
this
->
terminate
(
m_ec
);
1415
return
;
1416
}
1417
1418
this
->
log_open_result
();
1419
1420
m_internal_state
=
istate
::
PROCESS_CONNECTION
;
1421
m_state
=
session
::
state
::
open
;
1422
1423
if
(
m_open_handler
) {
1424
m_open_handler
(
m_connection_hdl
);
1425
}
1426
1427
this
->
handle_read_frame
(
lib
::
error_code
(),
m_buf_cursor
);
1428
}
1429
1430
template
<
typename
config
>
1431
void
connection
<
config
>::
send_http_request
() {
1432
m_alog
.
write
(
log
::
alevel
::
devel
,
"connection send_http_request"
);
1433
1434
// TODO: origin header?
1435
1436
// Have the protocol processor fill in the appropriate fields based on the
1437
// selected client version
1438
if
(
m_processor
) {
1439
lib
::
error_code
ec
;
1440
ec
=
m_processor
->
client_handshake_request
(
m_request
,
m_uri
,
1441
m_requested_subprotocols
);
1442
1443
if
(
ec
) {
1444
log_err
(
log
::
elevel
::
fatal
,
"Internal library error: Processor"
,
ec
);
1445
return
;
1446
}
1447
}
else
{
1448
m_elog
.
write
(
log
::
elevel
::
fatal
,
"Internal library error: missing processor"
);
1449
return
;
1450
}
1451
1452
// Unless the user has overridden the user agent, send generic WS++ UA.
1453
if
(
m_request
.
get_header
(
"User-Agent"
).
empty
()) {
1454
if
(!
m_user_agent
.
empty
()) {
1455
m_request
.
replace_header
(
"User-Agent"
,
m_user_agent
);
1456
}
else
{
1457
m_request
.
remove_header
(
"User-Agent"
);
1458
}
1459
}
1460
1461
m_handshake_buffer
=
m_request
.
raw
();
1462
1463
if
(
m_alog
.
static_test
(
log
::
alevel
::
devel
)) {
1464
m_alog
.
write
(
log
::
alevel
::
devel
,
"Raw Handshake request:\n"
+
m_handshake_buffer
);
1465
}
1466
1467
if
(
m_open_handshake_timeout_dur
> 0) {
1468
m_handshake_timer
=
transport_con_type
::
set_timer
(
1469
m_open_handshake_timeout_dur
,
1470
lib
::
bind
(
1471
&
type
::
handle_open_handshake_timeout
,
1472
type
::
get_shared
(),
1473
lib
::
placeholders
::
_1
1474
)
1475
);
1476
}
1477
1478
transport_con_type
::
async_write
(
1479
m_handshake_buffer
.
data
(),
1480
m_handshake_buffer
.
size
(),
1481
lib
::
bind
(
1482
&
type
::
handle_send_http_request
,
1483
type
::
get_shared
(),
1484
lib
::
placeholders
::
_1
1485
)
1486
);
1487
}
1488
1489
template
<
typename
config
>
1490
void
connection
<
config
>::
handle_send_http_request
(
lib
::
error_code
const
&
ec
) {
1491
m_alog
.
write
(
log
::
alevel
::
devel
,
"handle_send_http_request"
);
1492
1493
lib
::
error_code
ecm
=
ec
;
1494
1495
if
(!
ecm
) {
1496
scoped_lock_type
lock
(
m_connection_state_lock
);
1497
1498
if
(
m_state
==
session
::
state
::
connecting
) {
1499
if
(
m_internal_state
!=
istate
::
WRITE_HTTP_REQUEST
) {
1500
ecm
=
error
::
make_error_code
(
error
::
invalid_state
);
1501
}
else
{
1502
m_internal_state
=
istate
::
READ_HTTP_RESPONSE
;
1503
}
1504
}
else
if
(
m_state
==
session
::
state
::
closed
) {
1505
// The connection was canceled while the response was being sent,
1506
// usually by the handshake timer. This is basically expected
1507
// (though hopefully rare) and there is nothing we can do so ignore.
1508
m_alog
.
write
(
log
::
alevel
::
devel
,
1509
"handle_send_http_request invoked after connection was closed"
);
1510
return
;
1511
}
else
{
1512
ecm
=
error
::
make_error_code
(
error
::
invalid_state
);
1513
}
1514
}
1515
1516
if
(
ecm
) {
1517
if
(
ecm
==
transport
::
error
::
eof
&&
m_state
==
session
::
state
::
closed
) {
1518
// we expect to get eof if the connection is closed already
1519
m_alog
.
write
(
log
::
alevel
::
devel
,
1520
"got (expected) eof/state error from closed con"
);
1521
return
;
1522
}
1523
1524
log_err
(
log
::
elevel
::
rerror
,
"handle_send_http_request"
,
ecm
);
1525
this
->
terminate
(
ecm
);
1526
return
;
1527
}
1528
1529
transport_con_type
::
async_read_at_least
(
1530
1,
1531
m_buf
,
1532
config
::
connection_read_buffer_size
,
1533
lib
::
bind
(
1534
&
type
::
handle_read_http_response
,
1535
type
::
get_shared
(),
1536
lib
::
placeholders
::
_1
,
1537
lib
::
placeholders
::
_2
1538
)
1539
);
1540
}
1541
1542
template
<
typename
config
>
1543
void
connection
<
config
>::
handle_read_http_response
(
lib
::
error_code
const
&
ec
,
1544
size_t
bytes_transferred
)
1545
{
1546
m_alog
.
write
(
log
::
alevel
::
devel
,
"handle_read_http_response"
);
1547
1548
lib
::
error_code
ecm
=
ec
;
1549
1550
if
(!
ecm
) {
1551
scoped_lock_type
lock
(
m_connection_state_lock
);
1552
1553
if
(
m_state
==
session
::
state
::
connecting
) {
1554
if
(
m_internal_state
!=
istate
::
READ_HTTP_RESPONSE
) {
1555
ecm
=
error
::
make_error_code
(
error
::
invalid_state
);
1556
}
1557
}
else
if
(
m_state
==
session
::
state
::
closed
) {
1558
// The connection was canceled while the response was being sent,
1559
// usually by the handshake timer. This is basically expected
1560
// (though hopefully rare) and there is nothing we can do so ignore.
1561
m_alog
.
write
(
log
::
alevel
::
devel
,
1562
"handle_read_http_response invoked after connection was closed"
);
1563
return
;
1564
}
else
{
1565
ecm
=
error
::
make_error_code
(
error
::
invalid_state
);
1566
}
1567
}
1568
1569
if
(
ecm
) {
1570
if
(
ecm
==
transport
::
error
::
eof
&&
m_state
==
session
::
state
::
closed
) {
1571
// we expect to get eof if the connection is closed already
1572
m_alog
.
write
(
log
::
alevel
::
devel
,
1573
"got (expected) eof/state error from closed con"
);
1574
return
;
1575
}
1576
1577
log_err
(
log
::
elevel
::
rerror
,
"handle_read_http_response"
,
ecm
);
1578
this
->
terminate
(
ecm
);
1579
return
;
1580
}
1581
1582
size_t
bytes_processed
= 0;
1583
// TODO: refactor this to use error codes rather than exceptions
1584
try
{
1585
bytes_processed
=
m_response
.
consume
(
m_buf
,
bytes_transferred
);
1586
}
catch
(
http
::
exception
&
e
) {
1587
m_elog
.
write
(
log
::
elevel
::
rerror
,
1588
std
::
string
(
"error in handle_read_http_response: "
)+
e
.
what
());
1589
this
->
terminate
(
make_error_code
(
error
::
general
));
1590
return
;
1591
}
1592
1593
m_alog
.
write
(
log
::
alevel
::
devel
,
std
::
string
(
"Raw response: "
)+
m_response
.
raw
());
1594
1595
if
(
m_response
.
headers_ready
()) {
1596
if
(
m_handshake_timer
) {
1597
m_handshake_timer
->
cancel
();
1598
m_handshake_timer
.
reset
();
1599
}
1600
1601
lib
::
error_code
validate_ec
=
m_processor
->
validate_server_handshake_response
(
1602
m_request
,
1603
m_response
1604
);
1605
if
(
validate_ec
) {
1606
log_err
(
log
::
elevel
::
rerror
,
"Server handshake response"
,
validate_ec
);
1607
this
->
terminate
(
validate_ec
);
1608
return
;
1609
}
1610
1611
// Read extension parameters and set up values necessary for the end
1612
// user to complete extension negotiation.
1613
std
::
pair
<
lib
::
error_code
,
std
::
string
>
neg_results
;
1614
neg_results
=
m_processor
->
negotiate_extensions
(
m_response
);
1615
1616
if
(
neg_results
.
first
) {
1617
// There was a fatal error in extension negotiation. For the moment
1618
// kill all connections that fail extension negotiation.
1619
1620
// TODO: deal with cases where the response is well formed but
1621
// doesn't match the options requested by the client. Its possible
1622
// that the best behavior in this cases is to log and continue with
1623
// an unextended connection.
1624
m_alog
.
write
(
log
::
alevel
::
devel
,
"Extension negotiation failed: "
1625
+
neg_results
.
first
.
message
());
1626
this
->
terminate
(
make_error_code
(
error
::
extension_neg_failed
));
1627
// TODO: close connection with reason 1010 (and list extensions)
1628
}
1629
1630
// response is valid, connection can now be assumed to be open
1631
m_internal_state
=
istate
::
PROCESS_CONNECTION
;
1632
m_state
=
session
::
state
::
open
;
1633
1634
this
->
log_open_result
();
1635
1636
if
(
m_open_handler
) {
1637
m_open_handler
(
m_connection_hdl
);
1638
}
1639
1640
// The remaining bytes in m_buf are frame data. Copy them to the
1641
// beginning of the buffer and note the length. They will be read after
1642
// the handshake completes and before more bytes are read.
1643
std
::
copy
(
m_buf
+
bytes_processed
,
m_buf
+
bytes_transferred
,
m_buf
);
1644
m_buf_cursor
=
bytes_transferred
-
bytes_processed
;
1645
1646
this
->
handle_read_frame
(
lib
::
error_code
(),
m_buf_cursor
);
1647
}
else
{
1648
transport_con_type
::
async_read_at_least
(
1649
1,
1650
m_buf
,
1651
config
::
connection_read_buffer_size
,
1652
lib
::
bind
(
1653
&
type
::
handle_read_http_response
,
1654
type
::
get_shared
(),
1655
lib
::
placeholders
::
_1
,
1656
lib
::
placeholders
::
_2
1657
)
1658
);
1659
}
1660
}
1661
1662
template
<
typename
config
>
1663
void
connection
<
config
>::
handle_open_handshake_timeout
(
1664
lib
::
error_code
const
&
ec
)
1665
{
1666
if
(
ec
==
transport
::
error
::
operation_aborted
) {
1667
m_alog
.
write
(
log
::
alevel
::
devel
,
"open handshake timer cancelled"
);
1668
}
else
if
(
ec
) {
1669
m_alog
.
write
(
log
::
alevel
::
devel
,
1670
"open handle_open_handshake_timeout error: "
+
ec
.
message
());
1671
// TODO: ignore or fail here?
1672
}
else
{
1673
m_alog
.
write
(
log
::
alevel
::
devel
,
"open handshake timer expired"
);
1674
terminate
(
make_error_code
(
error
::
open_handshake_timeout
));
1675
}
1676
}
1677
1678
template
<
typename
config
>
1679
void
connection
<
config
>::
handle_close_handshake_timeout
(
1680
lib
::
error_code
const
&
ec
)
1681
{
1682
if
(
ec
==
transport
::
error
::
operation_aborted
) {
1683
m_alog
.
write
(
log
::
alevel
::
devel
,
"asio close handshake timer cancelled"
);
1684
}
else
if
(
ec
) {
1685
m_alog
.
write
(
log
::
alevel
::
devel
,
1686
"asio open handle_close_handshake_timeout error: "
+
ec
.
message
());
1687
// TODO: ignore or fail here?
1688
}
else
{
1689
m_alog
.
write
(
log
::
alevel
::
devel
,
"asio close handshake timer expired"
);
1690
terminate
(
make_error_code
(
error
::
close_handshake_timeout
));
1691
}
1692
}
1693
1694
template
<
typename
config
>
1695
void
connection
<
config
>::
terminate
(
lib
::
error_code
const
&
ec
) {
1696
if
(
m_alog
.
static_test
(
log
::
alevel
::
devel
)) {
1697
m_alog
.
write
(
log
::
alevel
::
devel
,
"connection terminate"
);
1698
}
1699
1700
// Cancel close handshake timer
1701
if
(
m_handshake_timer
) {
1702
m_handshake_timer
->
cancel
();
1703
m_handshake_timer
.
reset
();
1704
}
1705
1706
terminate_status
tstat
=
unknown
;
1707
if
(
ec
) {
1708
m_ec
=
ec
;
1709
m_local_close_code
=
close
::
status
::
abnormal_close
;
1710
m_local_close_reason
=
ec
.
message
();
1711
}
1712
1713
// TODO: does any of this need a mutex?
1714
if
(
m_is_http
) {
1715
m_http_state
=
session
::
http_state
::
closed
;
1716
}
1717
if
(
m_state
==
session
::
state
::
connecting
) {
1718
m_state
=
session
::
state
::
closed
;
1719
tstat
=
failed
;
1720
1721
// Log fail result here before socket is shut down and we can't get
1722
// the remote address, etc anymore
1723
if
(
m_ec
!=
error
::
http_connection_ended
) {
1724
log_fail_result
();
1725
}
1726
}
else
if
(
m_state
!=
session
::
state
::
closed
) {
1727
m_state
=
session
::
state
::
closed
;
1728
tstat
=
closed
;
1729
}
else
{
1730
m_alog
.
write
(
log
::
alevel
::
devel
,
1731
"terminate called on connection that was already terminated"
);
1732
return
;
1733
}
1734
1735
// TODO: choose between shutdown and close based on error code sent
1736
1737
transport_con_type
::
async_shutdown
(
1738
lib
::
bind
(
1739
&
type
::
handle_terminate
,
1740
type
::
get_shared
(),
1741
tstat
,
1742
lib
::
placeholders
::
_1
1743
)
1744
);
1745
}
1746
1747
template
<
typename
config
>
1748
void
connection
<
config
>::
handle_terminate
(
terminate_status
tstat
,
1749
lib
::
error_code
const
&
ec
)
1750
{
1751
if
(
m_alog
.
static_test
(
log
::
alevel
::
devel
)) {
1752
m_alog
.
write
(
log
::
alevel
::
devel
,
"connection handle_terminate"
);
1753
}
1754
1755
if
(
ec
) {
1756
// there was an error actually shutting down the connection
1757
log_err
(
log
::
elevel
::
devel
,
"handle_terminate"
,
ec
);
1758
}
1759
1760
// clean shutdown
1761
if
(
tstat
==
failed
) {
1762
if
(
m_ec
!=
error
::
http_connection_ended
) {
1763
if
(
m_fail_handler
) {
1764
m_fail_handler
(
m_connection_hdl
);
1765
}
1766
}
1767
}
else
if
(
tstat
==
closed
) {
1768
if
(
m_close_handler
) {
1769
m_close_handler
(
m_connection_hdl
);
1770
}
1771
log_close_result
();
1772
}
else
{
1773
m_elog
.
write
(
log
::
elevel
::
rerror
,
"Unknown terminate_status"
);
1774
}
1775
1776
// call the termination handler if it exists
1777
// if it exists it might (but shouldn't) refer to a bad memory location.
1778
// If it does, we don't care and should catch and ignore it.
1779
if
(
m_termination_handler
) {
1780
try
{
1781
m_termination_handler
(
type
::
get_shared
());
1782
}
catch
(
std
::
exception
const
&
e
) {
1783
m_elog
.
write
(
log
::
elevel
::
warn
,
1784
std
::
string
(
"termination_handler call failed. Reason was: "
)+
e
.
what
());
1785
}
1786
}
1787
}
1788
1789
template
<
typename
config
>
1790
void
connection
<
config
>::
write_frame
() {
1791
//m_alog.write(log::alevel::devel,"connection write_frame");
1792
1793
{
1794
scoped_lock_type
lock
(
m_write_lock
);
1795
1796
// Check the write flag. If true, there is an outstanding transport
1797
// write already. In this case we just return. The write handler will
1798
// start a new write if the write queue isn't empty. If false, we set
1799
// the write flag and proceed to initiate a transport write.
1800
if
(
m_write_flag
) {
1801
return
;
1802
}
1803
1804
// pull off all the messages that are ready to write.
1805
// stop if we get a message marked terminal
1806
message_ptr
next_message
=
write_pop
();
1807
while
(
next_message
) {
1808
m_current_msgs
.
push_back
(
next_message
);
1809
if
(!
next_message
->
get_terminal
()) {
1810
next_message
=
write_pop
();
1811
}
else
{
1812
next_message
=
message_ptr
();
1813
}
1814
}
1815
1816
if
(
m_current_msgs
.
empty
()) {
1817
// there was nothing to send
1818
return
;
1819
}
else
{
1820
// At this point we own the next messages to be sent and are
1821
// responsible for holding the write flag until they are
1822
// successfully sent or there is some error
1823
m_write_flag
=
true
;
1824
}
1825
}
1826
1827
typename
std
::
vector
<
message_ptr
>::
iterator
it
;
1828
for
(
it
=
m_current_msgs
.
begin
();
it
!=
m_current_msgs
.
end
(); ++
it
) {
1829
std
::
string
const
&
header
= (*
it
)->
get_header
();
1830
std
::
string
const
&
payload
= (*
it
)->
get_payload
();
1831
1832
m_send_buffer
.
push_back
(
transport
::
buffer
(
header
.
c_str
(),
header
.
size
()));
1833
m_send_buffer
.
push_back
(
transport
::
buffer
(
payload
.
c_str
(),
payload
.
size
()));
1834
}
1835
1836
// Print detailed send stats if those log levels are enabled
1837
if
(
m_alog
.
static_test
(
log
::
alevel
::
frame_header
)) {
1838
if
(
m_alog
.
dynamic_test
(
log
::
alevel
::
frame_header
)) {
1839
std
::
stringstream
general
,
header
,
payload
;
1840
1841
general
<<
"Dispatching write containing "
<<
m_current_msgs
.
size
()
1842
<<
" message(s) containing "
;
1843
header
<<
"Header Bytes: \n"
;
1844
payload
<<
"Payload Bytes: \n"
;
1845
1846
size_t
hbytes
= 0;
1847
size_t
pbytes
= 0;
1848
1849
for
(
size_t
i
= 0;
i
<
m_current_msgs
.
size
();
i
++) {
1850
hbytes
+=
m_current_msgs
[
i
]->
get_header
().
size
();
1851
pbytes
+=
m_current_msgs
[
i
]->
get_payload
().
size
();
1852
1853
1854
header
<<
"["
<<
i
<<
"] ("
1855
<<
m_current_msgs
[
i
]->
get_header
().
size
() <<
") "
1856
<<
utility
::
to_hex
(
m_current_msgs
[
i
]->
get_header
()) <<
"\n"
;
1857
1858
if
(
m_alog
.
static_test
(
log
::
alevel
::
frame_payload
)) {
1859
if
(
m_alog
.
dynamic_test
(
log
::
alevel
::
frame_payload
)) {
1860
payload
<<
"["
<<
i
<<
"] ("
1861
<<
m_current_msgs
[
i
]->
get_payload
().
size
() <<
") ["
<<
m_current_msgs
[
i
]->
get_opcode
()<<
"] "
1862
<< (
m_current_msgs
[
i
]->
get_opcode
() ==
frame
::
opcode
::
text
?
1863
m_current_msgs
[
i
]->
get_payload
() :
1864
utility
::
to_hex
(
m_current_msgs
[
i
]->
get_payload
())
1865
)
1866
<<
"\n"
;
1867
}
1868
}
1869
}
1870
1871
general
<<
hbytes
<<
" header bytes and "
<<
pbytes
<<
" payload bytes"
;
1872
1873
m_alog
.
write
(
log
::
alevel
::
frame_header
,
general
.
str
());
1874
m_alog
.
write
(
log
::
alevel
::
frame_header
,
header
.
str
());
1875
m_alog
.
write
(
log
::
alevel
::
frame_payload
,
payload
.
str
());
1876
}
1877
}
1878
1879
transport_con_type
::
async_write
(
1880
m_send_buffer
,
1881
m_write_frame_handler
1882
);
1883
}
1884
1885
template
<
typename
config
>
1886
void
connection
<
config
>::
handle_write_frame
(
lib
::
error_code
const
&
ec
)
1887
{
1888
if
(
m_alog
.
static_test
(
log
::
alevel
::
devel
)) {
1889
m_alog
.
write
(
log
::
alevel
::
devel
,
"connection handle_write_frame"
);
1890
}
1891
1892
bool
terminal
=
m_current_msgs
.
back
()->
get_terminal
();
1893
1894
m_send_buffer
.
clear
();
1895
m_current_msgs
.
clear
();
1896
// TODO: recycle instead of deleting
1897
1898
if
(
ec
) {
1899
log_err
(
log
::
elevel
::
fatal
,
"handle_write_frame"
,
ec
);
1900
this
->
terminate
(
ec
);
1901
return
;
1902
}
1903
1904
if
(
terminal
) {
1905
this
->
terminate
(
lib
::
error_code
());
1906
return
;
1907
}
1908
1909
bool
needs_writing
=
false
;
1910
{
1911
scoped_lock_type
lock
(
m_write_lock
);
1912
1913
// release write flag
1914
m_write_flag
=
false
;
1915
1916
needs_writing
= !
m_send_queue
.
empty
();
1917
}
1918
1919
if
(
needs_writing
) {
1920
transport_con_type
::
dispatch
(
lib
::
bind
(
1921
&
type
::
write_frame
,
1922
type
::
get_shared
()
1923
));
1924
}
1925
}
1926
1927
template
<
typename
config
>
1928
std
::
vector
<
int
>
const
&
connection
<
config
>::
get_supported_versions
()
const
1929
{
1930
return
versions_supported
;
1931
}
1932
1933
template
<
typename
config
>
1934
void
connection
<
config
>::
process_control_frame
(
typename
config
::
message_type
::
ptr
msg
)
1935
{
1936
m_alog
.
write
(
log
::
alevel
::
devel
,
"process_control_frame"
);
1937
1938
frame
::
opcode
::
value
op
=
msg
->
get_opcode
();
1939
lib
::
error_code
ec
;
1940
1941
std
::
stringstream
s
;
1942
s
<<
"Control frame received with opcode "
<<
op
;
1943
m_alog
.
write
(
log
::
alevel
::
control
,
s
.
str
());
1944
1945
if
(
m_state
==
session
::
state
::
closed
) {
1946
m_elog
.
write
(
log
::
elevel
::
warn
,
"got frame in state closed"
);
1947
return
;
1948
}
1949
if
(
op
!=
frame
::
opcode
::
CLOSE
&&
m_state
!=
session
::
state
::
open
) {
1950
m_elog
.
write
(
log
::
elevel
::
warn
,
"got non-close frame in state closing"
);
1951
return
;
1952
}
1953
1954
if
(
op
==
frame
::
opcode
::
PING
) {
1955
bool
should_reply
=
true
;
1956
1957
if
(
m_ping_handler
) {
1958
should_reply
=
m_ping_handler
(
m_connection_hdl
,
msg
->
get_payload
());
1959
}
1960
1961
if
(
should_reply
) {
1962
this
->
pong
(
msg
->
get_payload
(),
ec
);
1963
if
(
ec
) {
1964
log_err
(
log
::
elevel
::
devel
,
"Failed to send response pong"
,
ec
);
1965
}
1966
}
1967
}
else
if
(
op
==
frame
::
opcode
::
PONG
) {
1968
if
(
m_pong_handler
) {
1969
m_pong_handler
(
m_connection_hdl
,
msg
->
get_payload
());
1970
}
1971
if
(
m_ping_timer
) {
1972
m_ping_timer
->
cancel
();
1973
}
1974
}
else
if
(
op
==
frame
::
opcode
::
CLOSE
) {
1975
m_alog
.
write
(
log
::
alevel
::
devel
,
"got close frame"
);
1976
// record close code and reason somewhere
1977
1978
m_remote_close_code
=
close
::
extract_code
(
msg
->
get_payload
(),
ec
);
1979
if
(
ec
) {
1980
s
.
str
(
""
);
1981
if
(
config
::
drop_on_protocol_error
) {
1982
s
<<
"Received invalid close code "
<<
m_remote_close_code
1983
<<
" dropping connection per config."
;
1984
m_elog
.
write
(
log
::
elevel
::
devel
,
s
.
str
());
1985
this
->
terminate
(
ec
);
1986
}
else
{
1987
s
<<
"Received invalid close code "
<<
m_remote_close_code
1988
<<
" sending acknowledgement and closing"
;
1989
m_elog
.
write
(
log
::
elevel
::
devel
,
s
.
str
());
1990
ec
=
send_close_ack
(
close
::
status
::
protocol_error
,
1991
"Invalid close code"
);
1992
if
(
ec
) {
1993
log_err
(
log
::
elevel
::
devel
,
"send_close_ack"
,
ec
);
1994
}
1995
}
1996
return
;
1997
}
1998
1999
m_remote_close_reason
=
close
::
extract_reason
(
msg
->
get_payload
(),
ec
);
2000
if
(
ec
) {
2001
if
(
config
::
drop_on_protocol_error
) {
2002
m_elog
.
write
(
log
::
elevel
::
devel
,
2003
"Received invalid close reason. Dropping connection per config"
);
2004
this
->
terminate
(
ec
);
2005
}
else
{
2006
m_elog
.
write
(
log
::
elevel
::
devel
,
2007
"Received invalid close reason. Sending acknowledgement and closing"
);
2008
ec
=
send_close_ack
(
close
::
status
::
protocol_error
,
2009
"Invalid close reason"
);
2010
if
(
ec
) {
2011
log_err
(
log
::
elevel
::
devel
,
"send_close_ack"
,
ec
);
2012
}
2013
}
2014
return
;
2015
}
2016
2017
if
(
m_state
==
session
::
state
::
open
) {
2018
s
.
str
(
""
);
2019
s
<<
"Received close frame with code "
<<
m_remote_close_code
2020
<<
" and reason "
<<
m_remote_close_reason
;
2021
m_alog
.
write
(
log
::
alevel
::
devel
,
s
.
str
());
2022
2023
ec
=
send_close_ack
();
2024
if
(
ec
) {
2025
log_err
(
log
::
elevel
::
devel
,
"send_close_ack"
,
ec
);
2026
}
2027
}
else
if
(
m_state
==
session
::
state
::
closing
&& !
m_was_clean
) {
2028
// ack of our close
2029
m_alog
.
write
(
log
::
alevel
::
devel
,
"Got acknowledgement of close"
);
2030
2031
m_was_clean
=
true
;
2032
2033
// If we are a server terminate the connection now. Clients should
2034
// leave the connection open to give the server an opportunity to
2035
// initiate the TCP close. The client's timer will handle closing
2036
// its side of the connection if the server misbehaves.
2037
//
2038
// TODO: different behavior if the underlying transport doesn't
2039
// support timers?
2040
if
(
m_is_server
) {
2041
terminate
(
lib
::
error_code
());
2042
}
2043
}
else
{
2044
// spurious, ignore
2045
m_elog
.
write
(
log
::
elevel
::
devel
,
"Got close frame in wrong state"
);
2046
}
2047
}
else
{
2048
// got an invalid control opcode
2049
m_elog
.
write
(
log
::
elevel
::
devel
,
"Got control frame with invalid opcode"
);
2050
// initiate protocol error shutdown
2051
}
2052
}
2053
2054
template
<
typename
config
>
2055
lib
::
error_code
connection
<
config
>::
send_close_ack
(
close
::
status
::
value
code
,
2056
std
::
string
const
&
reason
)
2057
{
2058
return
send_close_frame
(
code
,
reason
,
true
,
m_is_server
);
2059
}
2060
2061
template
<
typename
config
>
2062
lib
::
error_code
connection
<
config
>::
send_close_frame
(
close
::
status
::
value
code
,
2063
std
::
string
const
&
reason
,
bool
ack
,
bool
terminal
)
2064
{
2065
m_alog
.
write
(
log
::
alevel
::
devel
,
"send_close_frame"
);
2066
2067
// check for special codes
2068
2069
// If silent close is set, respect it and blank out close information
2070
// Otherwise use whatever has been specified in the parameters. If
2071
// parameters specifies close::status::blank then determine what to do
2072
// based on whether or not this is an ack. If it is not an ack just
2073
// send blank info. If it is an ack then echo the close information from
2074
// the remote endpoint.
2075
if
(
config
::
silent_close
) {
2076
m_alog
.
write
(
log
::
alevel
::
devel
,
"closing silently"
);
2077
m_local_close_code
=
close
::
status
::
no_status
;
2078
m_local_close_reason
.
clear
();
2079
}
else
if
(
code
!=
close
::
status
::
blank
) {
2080
m_alog
.
write
(
log
::
alevel
::
devel
,
"closing with specified codes"
);
2081
m_local_close_code
=
code
;
2082
m_local_close_reason
=
reason
;
2083
}
else
if
(!
ack
) {
2084
m_alog
.
write
(
log
::
alevel
::
devel
,
"closing with no status code"
);
2085
m_local_close_code
=
close
::
status
::
no_status
;
2086
m_local_close_reason
.
clear
();
2087
}
else
if
(
m_remote_close_code
==
close
::
status
::
no_status
) {
2088
m_alog
.
write
(
log
::
alevel
::
devel
,
2089
"acknowledging a no-status close with normal code"
);
2090
m_local_close_code
=
close
::
status
::
normal
;
2091
m_local_close_reason
.
clear
();
2092
}
else
{
2093
m_alog
.
write
(
log
::
alevel
::
devel
,
"acknowledging with remote codes"
);
2094
m_local_close_code
=
m_remote_close_code
;
2095
m_local_close_reason
=
m_remote_close_reason
;
2096
}
2097
2098
std
::
stringstream
s
;
2099
s
<<
"Closing with code: "
<<
m_local_close_code
<<
", and reason: "
2100
<<
m_local_close_reason
;
2101
m_alog
.
write
(
log
::
alevel
::
devel
,
s
.
str
());
2102
2103
message_ptr
msg
=
m_msg_manager
->
get_message
();
2104
if
(!
msg
) {
2105
return
error
::
make_error_code
(
error
::
no_outgoing_buffers
);
2106
}
2107
2108
lib
::
error_code
ec
=
m_processor
->
prepare_close
(
m_local_close_code
,
2109
m_local_close_reason
,
msg
);
2110
if
(
ec
) {
2111
return
ec
;
2112
}
2113
2114
// Messages flagged terminal will result in the TCP connection being dropped
2115
// after the message has been written. This is typically used when servers
2116
// send an ack and when any endpoint encounters a protocol error
2117
if
(
terminal
) {
2118
msg
->
set_terminal
(
true
);
2119
}
2120
2121
m_state
=
session
::
state
::
closing
;
2122
2123
if
(
ack
) {
2124
m_was_clean
=
true
;
2125
}
2126
2127
// Start a timer so we don't wait forever for the acknowledgement close
2128
// frame
2129
if
(
m_close_handshake_timeout_dur
> 0) {
2130
m_handshake_timer
=
transport_con_type
::
set_timer
(
2131
m_close_handshake_timeout_dur
,
2132
lib
::
bind
(
2133
&
type
::
handle_close_handshake_timeout
,
2134
type
::
get_shared
(),
2135
lib
::
placeholders
::
_1
2136
)
2137
);
2138
}
2139
2140
bool
needs_writing
=
false
;
2141
{
2142
scoped_lock_type
lock
(
m_write_lock
);
2143
write_push
(
msg
);
2144
needs_writing
= !
m_write_flag
&& !
m_send_queue
.
empty
();
2145
}
2146
2147
if
(
needs_writing
) {
2148
transport_con_type
::
dispatch
(
lib
::
bind
(
2149
&
type
::
write_frame
,
2150
type
::
get_shared
()
2151
));
2152
}
2153
2154
return
lib
::
error_code
();
2155
}
2156
2157
template
<
typename
config
>
2158
typename
connection
<
config
>::
processor_ptr
2159
connection
<
config
>::
get_processor
(
int
version
)
const
{
2160
// TODO: allow disabling certain versions
2161
2162
processor_ptr
p
;
2163
2164
switch
(
version
) {
2165
case
0:
2166
p
=
lib
::
make_shared
<
processor
::
hybi00
<
config
> >(
2167
transport_con_type
::
is_secure
(),
2168
m_is_server
,
2169
m_msg_manager
2170
);
2171
break
;
2172
case
7:
2173
p
=
lib
::
make_shared
<
processor
::
hybi07
<
config
> >(
2174
transport_con_type
::
is_secure
(),
2175
m_is_server
,
2176
m_msg_manager
,
2177
lib
::
ref
(
m_rng
)
2178
);
2179
break
;
2180
case
8:
2181
p
=
lib
::
make_shared
<
processor
::
hybi08
<
config
> >(
2182
transport_con_type
::
is_secure
(),
2183
m_is_server
,
2184
m_msg_manager
,
2185
lib
::
ref
(
m_rng
)
2186
);
2187
break
;
2188
case
13:
2189
p
=
lib
::
make_shared
<
processor
::
hybi13
<
config
> >(
2190
transport_con_type
::
is_secure
(),
2191
m_is_server
,
2192
m_msg_manager
,
2193
lib
::
ref
(
m_rng
)
2194
);
2195
break
;
2196
default
:
2197
return
p
;
2198
}
2199
2200
// Settings not configured by the constructor
2201
p
->
set_max_message_size
(
m_max_message_size
);
2202
2203
return
p
;
2204
}
2205
2206
template
<
typename
config
>
2207
void
connection
<
config
>::
write_push
(
typename
config
::
message_type
::
ptr
msg
)
2208
{
2209
if
(!
msg
) {
2210
return
;
2211
}
2212
2213
m_send_buffer_size
+=
msg
->
get_payload
().
size
();
2214
m_send_queue
.
push
(
msg
);
2215
2216
if
(
m_alog
.
static_test
(
log
::
alevel
::
devel
)) {
2217
std
::
stringstream
s
;
2218
s
<<
"write_push: message count: "
<<
m_send_queue
.
size
()
2219
<<
" buffer size: "
<<
m_send_buffer_size
;
2220
m_alog
.
write
(
log
::
alevel
::
devel
,
s
.
str
());
2221
}
2222
}
2223
2224
template
<
typename
config
>
2225
typename
config
::
message_type
::
ptr
connection
<
config
>::
write_pop
()
2226
{
2227
message_ptr
msg
;
2228
2229
if
(
m_send_queue
.
empty
()) {
2230
return
msg
;
2231
}
2232
2233
msg
=
m_send_queue
.
front
();
2234
2235
m_send_buffer_size
-=
msg
->
get_payload
().
size
();
2236
m_send_queue
.
pop
();
2237
2238
if
(
m_alog
.
static_test
(
log
::
alevel
::
devel
)) {
2239
std
::
stringstream
s
;
2240
s
<<
"write_pop: message count: "
<<
m_send_queue
.
size
()
2241
<<
" buffer size: "
<<
m_send_buffer_size
;
2242
m_alog
.
write
(
log
::
alevel
::
devel
,
s
.
str
());
2243
}
2244
return
msg
;
2245
}
2246
2247
template
<
typename
config
>
2248
void
connection
<
config
>::
log_open_result
()
2249
{
2250
std
::
stringstream
s
;
2251
2252
int
version
;
2253
if
(!
processor
::
is_websocket_handshake
(
m_request
)) {
2254
version
= -1;
2255
}
else
{
2256
version
=
processor
::
get_websocket_version
(
m_request
);
2257
}
2258
2259
// Connection Type
2260
s
<< (
version
== -1 ?
"HTTP"
:
"WebSocket"
) <<
" Connection "
;
2261
2262
// Remote endpoint address
2263
s
<<
transport_con_type
::
get_remote_endpoint
() <<
" "
;
2264
2265
// Version string if WebSocket
2266
if
(
version
!= -1) {
2267
s
<<
"v"
<<
version
<<
" "
;
2268
}
2269
2270
// User Agent
2271
std
::
string
ua
=
m_request
.
get_header
(
"User-Agent"
);
2272
if
(
ua
.
empty
()) {
2273
s
<<
"\"\" "
;
2274
}
else
{
2275
// check if there are any quotes in the user agent
2276
s
<<
"\""
<<
utility
::
string_replace_all
(
ua
,
"\""
,
"\\\""
) <<
"\" "
;
2277
}
2278
2279
// URI
2280
s
<< (
m_uri
?
m_uri
->
get_resource
() :
"NULL"
) <<
" "
;
2281
2282
// Status code
2283
s
<<
m_response
.
get_status_code
();
2284
2285
m_alog
.
write
(
log
::
alevel
::
connect
,
s
.
str
());
2286
}
2287
2288
template
<
typename
config
>
2289
void
connection
<
config
>::
log_close_result
()
2290
{
2291
std
::
stringstream
s
;
2292
2293
s
<<
"Disconnect "
2294
<<
"close local:["
<<
m_local_close_code
2295
<< (
m_local_close_reason
.
empty
() ?
""
:
","
+
m_local_close_reason
)
2296
<<
"] remote:["
<<
m_remote_close_code
2297
<< (
m_remote_close_reason
.
empty
() ?
""
:
","
+
m_remote_close_reason
) <<
"]"
;
2298
2299
m_alog
.
write
(
log
::
alevel
::
disconnect
,
s
.
str
());
2300
}
2301
2302
template
<
typename
config
>
2303
void
connection
<
config
>::
log_fail_result
()
2304
{
2305
std
::
stringstream
s
;
2306
2307
int
version
=
processor
::
get_websocket_version
(
m_request
);
2308
2309
// Connection Type
2310
s
<<
"WebSocket Connection "
;
2311
2312
// Remote endpoint address & WebSocket version
2313
s
<<
transport_con_type
::
get_remote_endpoint
();
2314
if
(
version
< 0) {
2315
s
<<
" -"
;
2316
}
else
{
2317
s
<<
" v"
<<
version
;
2318
}
2319
2320
// User Agent
2321
std
::
string
ua
=
m_request
.
get_header
(
"User-Agent"
);
2322
if
(
ua
.
empty
()) {
2323
s
<<
" \"\" "
;
2324
}
else
{
2325
// check if there are any quotes in the user agent
2326
s
<<
" \""
<<
utility
::
string_replace_all
(
ua
,
"\""
,
"\\\""
) <<
"\" "
;
2327
}
2328
2329
// URI
2330
s
<< (
m_uri
?
m_uri
->
get_resource
() :
"-"
);
2331
2332
// HTTP Status code
2333
s
<<
" "
<<
m_response
.
get_status_code
();
2334
2335
// WebSocket++ error code & reason
2336
s
<<
" "
<<
m_ec
<<
" "
<<
m_ec
.
message
();
2337
2338
m_alog
.
write
(
log
::
alevel
::
fail
,
s
.
str
());
2339
}
2340
2341
template
<
typename
config
>
2342
void
connection
<
config
>::
log_http_result
() {
2343
std
::
stringstream
s
;
2344
2345
if
(
processor
::
is_websocket_handshake
(
m_request
)) {
2346
m_alog
.
write
(
log
::
alevel
::
devel
,
"Call to log_http_result for WebSocket"
);
2347
return
;
2348
}
2349
2350
// Connection Type
2351
s
<< (
m_request
.
get_header
(
"host"
).
empty
() ?
"-"
:
m_request
.
get_header
(
"host"
))
2352
<<
" "
<<
transport_con_type
::
get_remote_endpoint
()
2353
<<
" \""
<<
m_request
.
get_method
()
2354
<<
" "
<< (
m_uri
?
m_uri
->
get_resource
() :
"-"
)
2355
<<
" "
<<
m_request
.
get_version
() <<
"\" "
<<
m_response
.
get_status_code
()
2356
<<
" "
<<
m_response
.
get_body
().
size
();
2357
2358
// User Agent
2359
std
::
string
ua
=
m_request
.
get_header
(
"User-Agent"
);
2360
if
(
ua
.
empty
()) {
2361
s
<<
" \"\" "
;
2362
}
else
{
2363
// check if there are any quotes in the user agent
2364
s
<<
" \""
<<
utility
::
string_replace_all
(
ua
,
"\""
,
"\\\""
) <<
"\" "
;
2365
}
2366
2367
m_alog
.
write
(
log
::
alevel
::
http
,
s
.
str
());
2368
}
2369
2370
}
// namespace websocketpp
2371
2372
#
endif
// WEBSOCKETPP_CONNECTION_IMPL_HPP
websocketpp::server::handle_accept
void handle_accept(connection_ptr con, lib::error_code const &ec)
Handler callback for start_accept.
Definition:
server_endpoint.hpp:161
Generated by
1.8.11