Blog
Boto3とStripeをPythonでローカル実行
Posted over 2 years ago
この記事をシェア:
Virtualenvの立ち上げ
Pythonでの開発環境をたてます。
$ mkdir python_tasks
$ virtualenv --no-site-packages python_tasks
New python executable in /dev/python_tasks/python_tasks/bin/python
Installing setuptools, pip, wheel...done.
$ cd python_tasks
$ source bin/activate
(python_tasks) :python_tasks $
パッケージインストール
必要なパッケージをいれましょう。AWSリソースへのアクセスはboto3を、Stripeへのアクセスはstripeを追加します。
$ pip install --upgrade pip
$ pip install boto3
$ pip install stripe
$ pip list
Package Version
--------------- ---------
boto3 1.7.9
botocore 1.10.9
certifi 2018.4.16
chardet 3.0.4
docutils 0.14
futures 3.2.0
idna 2.6
jmespath 0.9.3
pip 10.0.1
python-dateutil 2.7.2
requests 2.18.4
s3transfer 0.1.13
setuptools 21.2.1
six 1.11.0
stripe 1.80.0
urllib3 1.22
wheel 0.29.0
以下のようなエラーが出た場合は、pipのバージョンを確認しましょう。pipのアップデートを忘れているケースだと思いますので、pip install --upgrade pipで解決します。
Collecting boto3
Could not fetch URL https://pypi.python.org/simple/boto3/: There was a problem confirming the ssl certificate: [SSL: TLSV1_ALERT_PROTOCOL_VERSION] tlsv1 alert protocol version (_ssl.c:590) - skipping
Could not find a version that satisfies the requirement boto3 (from versions: )
No matching distribution found for boto3
Pythonのスクリプトファイルを用意
実行するコードを記述しましょう。例えばCognito User Poolsでアカウントの有効化が完了しているユーザーのみ取得するコードであれば、以下のようになります。
import boto3
cognito = boto3.client('cognito-idp')
UserPoolId = 'us-east-1_XXXX'
def getUsers():
response = cognito.list_users(
UserPoolId=UserPoolId,
Filter="cognito:user_status='CONFIRMED'"
)
return response
users = getUsers()
print(users)
Stripeのカスタマーのメールアドレス更新であれば、以下のような書き方ができます。
import stripe
stripeKey = 'sk_test_XXXXXXXXX'
class Stripe:
def __init__(self, stripe):
self.stripe = stripe
def setApiKey(self, apiKey):
self.stripe.api_key = apiKey
def getCustomer(self, customerId):
return self.stripe.Customer.retrieve(customerId)
def updateCustomerEmail(self, customerId, email):
customer = self.getCustomer(customerId)
customer.email = email
customer.save()
stripeClass = Stripe(stripe)
stripeClass.setApiKey(stripeKey)
stripeClass.updateCustomerEmail('cus_xxxx', 'hoge@example.com')
応用:CognitoのメールアドレスをStripeの顧客に反映する
import boto3
import stripe
cognito = boto3.client('cognito-idp')
UserPoolId = 'us-east-1_XXXXX'
stripeKey = 'sk_test_XXXXX'
class Stripe:
def __init__(self, stripe):
self.stripe = stripe
def setApiKey(self, apiKey):
self.stripe.api_key = apiKey
def getCustomer(self, customerId):
return self.stripe.Customer.retrieve(customerId)
def updateCustomerEmail(self, customerId, email):
customer = self.getCustomer(customerId)
print(customer.email)
customer.email = email
customer.save()
def getUsers():
response = cognito.list_users(
UserPoolId=UserPoolId,
Filter="cognito:user_status='CONFIRMED'"
)
return response
def getUsersWithPagenation(PaginationToken):
response = cognito.list_users(
UserPoolId=UserPoolId,
PaginationToken=PaginationToken,
Filter="cognito:user_status='CONFIRMED'"
)
return response
class workflow:
def __init__(self, stripeApiKey):
stripeClass = Stripe(stripe)
stripeClass.setApiKey(stripeKey)
self.stripe = stripeClass
def syncEmailToStripeWorkflow(self, users):
self.syncEmailToStripe(users['Users'])
if 'PaginationToken' in users:
users = getUsersWithPagenation(users['PaginationToken'])
self.syncEmailToStripeWorkflow(users)
else:
print('end')
def getUserAttributes(self, attributes, name):
name_values = [x['Value'] for x in attributes if x['Name'] == name]
value = name_values[0] if len(name_values) else ''
return value
def runSyncEmailToStripe(self, user):
try :
stripeId = self.getUserAttributes(user['Attributes'], 'custom:stripeID')
if (not stripeId): return
email = self.getUserAttributes(user['Attributes'], 'email')
print(user['Username'])
print(email)
print(stripeId)
result = self.stripe.updateCustomerEmail(stripeId, email)
print(result)
except:
print('Fail to sync stripe customer')
def syncEmailToStripe(self, users):
for user in users:
if user['Enabled']:
self.runSyncEmailToStripe(user)
users = getUsers()
wf = workflow(stripeKey)
wf.syncEmailToStripeWorkflow(users)
StripeとCognito User Pools系を操作できるようになれば、有料会員サービスの実装なども比較的やりやすくなるので、みなさんぜひ触ってみてください。
Tools to Support Stripe Development
We provide helpful tools to extend the Stripe Dashboard and streamline development and testing.
View All ToolsRelated Articles
Support This Project
If you find this content helpful, consider supporting the project through GitHub Sponsors. Your support helps maintain and improve these tools.
