2121import time
2222import uuid
2323
24+ import requests
25+ from requests .auth import HTTPBasicAuth
2426import six
2527from six .moves import range
2628import urllib3
@@ -172,7 +174,6 @@ def alias(self, alias_id, original, meta=None):
172174 Calling this method *always* results in a synchronous HTTP request
173175 to Mixpanel servers, regardless of any custom consumer.
174176 """
175- sync_consumer = Consumer ()
176177 event = {
177178 'event' : '$create_alias' ,
178179 'properties' : {
@@ -183,6 +184,8 @@ def alias(self, alias_id, original, meta=None):
183184 }
184185 if meta :
185186 event .update (meta )
187+
188+ sync_consumer = Consumer ()
186189 sync_consumer .send ('events' , json_dumps (event , cls = self ._serializer ))
187190
188191 def merge (self , api_key , distinct_id1 , distinct_id2 , meta = None , api_secret = None ):
@@ -540,7 +543,7 @@ class Consumer(object):
540543
541544 def __init__ (self , events_url = None , people_url = None , import_url = None ,
542545 request_timeout = None , groups_url = None , api_host = "api.mixpanel.com" ,
543- retry_limit = 4 , retry_backoff_factor = 0.25 , verify_cert = False ):
546+ retry_limit = 4 , retry_backoff_factor = 0.25 , verify_cert = True ):
544547 # TODO: With next major version, make the above args kwarg-only, and reorder them.
545548 self ._endpoints = {
546549 'events' : events_url or 'https://{}/track' .format (api_host ),
@@ -549,31 +552,28 @@ def __init__(self, events_url=None, people_url=None, import_url=None,
549552 'imports' : import_url or 'https://{}/import' .format (api_host ),
550553 }
551554
552- retry_args = {
553- "total" : retry_limit ,
554- "backoff_factor" : retry_backoff_factor ,
555- "status_forcelist" : set (range (500 , 600 )),
556- }
555+ self ._verify_cert = verify_cert
556+ self ._request_timeout = request_timeout
557557
558558 # Work around renamed argument in urllib3.
559559 if hasattr (urllib3 .util .Retry .DEFAULT , "allowed_methods" ):
560560 methods_arg = "allowed_methods"
561561 else :
562562 methods_arg = "method_whitelist"
563563
564- retry_args [methods_arg ] = {"POST" }
565- retry_config = urllib3 .Retry (** retry_args )
566-
567- if not verify_cert :
568- urllib3 .disable_warnings (urllib3 .exceptions .InsecureRequestWarning )
569-
570- cert_reqs = 'CERT_REQUIRED' if verify_cert else 'CERT_NONE'
571- self ._http = urllib3 .PoolManager (
572- retries = retry_config ,
573- timeout = urllib3 .Timeout (request_timeout ),
574- cert_reqs = str (cert_reqs ),
564+ retry_args = {
565+ "total" : retry_limit ,
566+ "backoff_factor" : retry_backoff_factor ,
567+ "status_forcelist" : set (range (500 , 600 )),
568+ methods_arg : {"POST" },
569+ }
570+ adapter = requests .adapters .HTTPAdapter (
571+ max_retries = urllib3 .Retry (** retry_args ),
575572 )
576573
574+ self ._session = requests .Session ()
575+ self ._session .mount ('http' , adapter )
576+
577577 def send (self , endpoint , json_message , api_key = None , api_secret = None ):
578578 """Immediately record an event or a profile update.
579579
@@ -594,40 +594,38 @@ def send(self, endpoint, json_message, api_key=None, api_secret=None):
594594 self ._write_request (self ._endpoints [endpoint ], json_message , api_key , api_secret )
595595
596596 def _write_request (self , request_url , json_message , api_key = None , api_secret = None ):
597- data = {
598- 'data' : json_message ,
599- 'verbose' : 1 ,
600- 'ip' : 0 ,
601- }
602-
603597 if isinstance (api_key , tuple ):
604598 # For compatibility with subclassers, allow the auth details to be
605599 # packed into the existing api_key param.
606600 api_key , api_secret = api_key
607601
602+ params = {
603+ 'data' : json_message ,
604+ 'verbose' : 1 ,
605+ 'ip' : 0 ,
606+ }
608607 if api_key :
609- data .update ({'api_key' : api_key })
610-
611- headers = None
608+ params ['api_key' ] = api_key
612609
610+ basic_auth = None
613611 if api_secret is not None :
614- headers = urllib3 . util . make_headers ( basic_auth = "{}:" . format ( api_secret ) )
612+ basic_auth = HTTPBasicAuth ( api_secret , '' )
615613
616614 try :
617- response = self ._http .request (
618- 'POST' ,
615+ response = self ._session .post (
619616 request_url ,
620- fields = data ,
621- headers = headers ,
622- encode_multipart = False , # URL-encode payload in POST body.
617+ data = params ,
618+ auth = basic_auth ,
619+ timeout = self ._request_timeout ,
620+ verify = self ._verify_cert ,
623621 )
624622 except Exception as e :
625623 six .raise_from (MixpanelException (e ), e )
626624
627625 try :
628- response_dict = json . loads ( response .data . decode ( 'utf-8' ) )
626+ response_dict = response .json ( )
629627 except ValueError :
630- raise MixpanelException ('Cannot interpret Mixpanel server response: {0}' .format (response .data ))
628+ raise MixpanelException ('Cannot interpret Mixpanel server response: {0}' .format (response .text ))
631629
632630 if response_dict ['status' ] != 1 :
633631 raise MixpanelException ('Mixpanel error: {0}' .format (response_dict ['error' ]))
@@ -669,7 +667,7 @@ class BufferedConsumer(object):
669667 """
670668 def __init__ (self , max_size = 50 , events_url = None , people_url = None , import_url = None ,
671669 request_timeout = None , groups_url = None , api_host = "api.mixpanel.com" ,
672- retry_limit = 4 , retry_backoff_factor = 0.25 , verify_cert = False ):
670+ retry_limit = 4 , retry_backoff_factor = 0.25 , verify_cert = True ):
673671 self ._consumer = Consumer (events_url , people_url , import_url , request_timeout ,
674672 groups_url , api_host , retry_limit , retry_backoff_factor , verify_cert )
675673 self ._buffers = {
0 commit comments